ActiveMQ and Camel : How to store a Copy of a JMS Message as File

 IT, Java  Kommentare deaktiviert für ActiveMQ and Camel : How to store a Copy of a JMS Message as File
Jan 152014
 

For some testing- and debug scenarios I had the need to store all messages passing the ActiveMQ Broker to the local file system. This enables me to analyse the messages after they were fetched by the consumers. ActiveMQ provides Camel-Support. Camel is a „framework“ for Enterprise Integration Patterns. I struggled a bit with that word, so here just a few capabilities of what camel is and what it supports:

Ok, now how to configure Camel in ActiveMQ:

In the conf/activemq.xml you have to import the camel.xml file:

<import resource="jetty.xml"/>
<import resource="camel.xml"/>

After that, configure the broker in conf/camel.xml. At the end of the file there is a bean id=activemq. Set your broker config there.
I had to change the brokerURL:

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
	<property name="connectionFactory">
	  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
		<property name="userName" value="${activemq.username}"/>
		<property name="password" value="${activemq.password}"/>
	  </bean>
	</property>
</bean>

Now have a look at the sample route in camel.xml:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
	<!-- You can use Spring XML syntax to define the routes here using the <route> element -->
	<route>
		<description>Example Camel Route</description>
		<from uri="activemq:example.A"/>
		<to uri="activemq:example.B"/>
	</route>
</camelContext>

This route  forwards a message from Queue example.A to example.B. Now add a multicast Route:

<route>
	<from uri="activemq:queue:a"/>
	<multicast>
		<to uri="activemq:queue:b"/>
		<to uri="activemq:queue:c"/>
		<to uri="activemq:queue:d"/>
	</multicast>
</route>

Let’s assume you would like to store Messages as File:

<route>
	<from uri="activemq:queue:a"/>
	<multicast>
		<to uri="activemq:queue:b"/>
		<to uri="activemq:queue:c"/>
		<to uri="activemq:queue:d"/>
		<to uri="file://C:java/apache-activemq-5.8.0/writeMsgToFile?autoCreate=true"/>
	</multicast>
</route>

The <to uri> expression creates a copy of the message as file in the defined directory. The autoCreate=true parameter creates the directory if it is not already there.

Lets assume you would like to set the file name for each message based on Header Parameters or the date when the message is processed:

<route>
	<from uri="activemq:queue:a"/>
	<multicast>
		<to uri="activemq:queue:b"/>
		<to uri="activemq:queue:c"/>
		<to uri="activemq:queue:d"/>
		<to uri="file://C:java/apache-activemq-5.8.0/writeMsgToFile?fileName=${date:now:yyyyMMdd}backup-${in.header.OrderType}-${date:now:hh.mm.ss.SSS}.txt"/>
	</multicast>
</route>

The ${date:now:yyyyMMdd} creates a new directory every day. The expression ${in.header.<CustomProperty>} allows you to use some custom JMS Property. In my case I have a custom property called „OrderType“. So I know by looking the filename which kind of message is stored. The last expression ${date:now:hh.mm.ss.SSS} inserts a timestamd in every filename. This is necessary if it can happen, that severel messages arrive the broker at the same time. If you do add milliseconds (SSS) to the filename, the old file will be overwritten by the new one which arrives the broker at the same second.

That’s a first, simple introduction on camel and ActiveMQ.

ActiveMQ Simple Authentication for Consumers and Producers

 IT, Java  Kommentare deaktiviert für ActiveMQ Simple Authentication for Consumers and Producers
Jul 032013
 

By default, ActiveMQ does not request a Username or password for consumers and producers. That means, everyone who knows your broker URL is able to connect to your broker and read your messages and/or send messages. For personal development this isn’t a big issue, but for business applications security plays an important role.

Because of that, I have played around with the authentication of consumers and producers on ActiveMQ brokers.

First of all you should know, that ActiveMQ supports a users and groups based authentication.In our example we would like to have two groups:

  • senders: they are allowed to send messages into queues (write access)
  • receivers: they are allowed to consume the messages (read access)

Setup groups and users for Receiver and Consumer:

To setup those two groups and one user for each group you have to add these lines (starting with „plugins“)  to your activemq.xml :

ATTENTION:  in earlier versions (I tested it also with 5.5.1) you have to insert the plugins-element in the correct alphabetical order. See also: http://activemq.apache.org/xml-reference.html –> Alphabetically ordered xml elements (5.4 – 5.5.1)

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

      <plugins>
        <simpleAuthenticationPlugin>
            <users>
                <authenticationUser username="superman" password="boss" groups="senders,receivers,admins" />
                <authenticationUser username="mySendingApp" password="manager" groups="senders" />
                <authenticationUser username="myReceivingApp" password="manager" groups="receivers" />
            </users>
        </simpleAuthenticationPlugin>
      </plugins>

Link groups and users to queues and topics

After that you need to grant rights to the created groups „senders“ , „receivers“ and „admins“. Add the following lines after the closing </simpleAuthenticationPlugin> tag:

<authorizationPlugin>
   <map>
       <authorizationMap>
           <authorizationEntries>
               <authorizationEntry queue=">" write="senders" read="receivers" admin="admins" />
	       <authorizationEntry topic="ActiveMQ.Advisory.>" write="senders" read="receivers" admin="admins,senders,receivers" />
           </authorizationEntries>
       </authorizationMap>
   </map>
</authorizationPlugin>

Explanation: You can specify different access rights for the single queues. In my example I allow all users from group „senders“ to write inside all queues. queue=“>“ stands for all queues. ActiveMQ uses the „>“ as Wildcard (see also the website-link below). Also I allow all receivers to read from the queue.

Maybe you are wondering about the second <authorizationEntry> . ActiveMQ-Clients creating Advisory-Topics for several reasons. For each queue a client connects to, the client tries to create a Advisory-Topic. That is the reason why I added the second line. This line defines that all clients (with correct password and username) are able to create Topics that are named „ActiveMQ.Advisory.*“.  More about ActiveMQs advisory stuff see website link below.

Now you can restart the ActiveMQ broker in order to read the new configuration.

Client side: connect to a username/password protected queue

Now we have to setup the client side to connect to a secured broker. The ConnectionFactory class provides a method called „createConnection“. There is an overloaded method of this: createConnection(String arg1 , String arg2) where arg1 is the username and arg2 is the password.

Here a short example for a listener connecting to a secured broker:

public void startListener() throws NamingException, JMSException, InterruptedException {

	// Create a JNDI API InitialContext object
	Properties props = new Properties();
	props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
	props.setProperty(Context.PROVIDER_URL, "tcp://" + server + ":61616");

	jndiContext = new InitialContext(props);
	connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
	destination = (Destination) jndiContext.lookup("dynamicQueues/" + queueName);

	// Connect to ActiveMQ
	connection = connectionFactory.createConnection(username, password);
	connection.setClientID(clientID);
	// this helps to ensure, that not 2 instances can connect to the broker simultaneously
	// because it is not allowed to connect to the same broker with the same clientID
	connection.start();
	session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

	MessageConsumer subscriber = session.createConsumer(destination);
	subscriber.setMessageListener(new MessageListener() {

		@Override
		public void onMessage(Message msg) {
			try {
				System.out.println("Message arrived by consumer-ID : " + clientID + " CONTENT = " + ((TextMessage) msg).getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	});

	for (int i = 0; i &lt; 100; i++) {
		System.out.println("still alive .... " + i);
		Thread.sleep(10 * 1000);
	}
}

 

Have fun with simple secured JMS 😉

——————————————————————

Sources / Websites:

ActiveMQ Wildcards: http://activemq.apache.org/wildcards.html

ActiveMQ Sample Borker Config for secured JMS Connections: http://activemq.apache.org/complex-single-broker-configuration-stomp-only.html

ActiveMQ Advisory Topics: http://activemq.apache.org/advisory-message.html

Java API for ConnectionFactory:  http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html

 

JMS: How to block several connections from one client to one broker

 IT, Java  Kommentare deaktiviert für JMS: How to block several connections from one client to one broker
Nov 142012
 

Hi,

I had the problem, that it should not be allowed to run 2 instances of the client simultaneously with the same broker.

I fixed that issue by setting the JMS Property ClientID.
When connecting to the broker, you can set a clientID. This ID is then registered on the Broker. If you now run that tool again (while the first instance is still running), then the second instance tries to connect, but the broker does not allow a second instance with the same clientID.

 connection.setClientID("Philipps_Tool");

Code Example:

// Create a JNDI API InitialContext object
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL, server);

jndiContext = new InitialContext(props);
connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
requestQueue = (Destination) jndiContext.lookup("dynamicQueues/" + requestQueueName);
responseQueue = (Destination) jndiContext.lookup("dynamicQueues/"+ responseQueueName);

//Connect to  ActiveMQ
connection = connectionFactory.createConnection();
connection.setClientID("Philipps_Tool");   
// this helps to ensure, that not 2 instances can connect to the broker simultaneously
// because it is not allowed to connect to the same broker with the same clientID
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

JMS Receiver / JMS Consumer with Selector

 IT, Java  Kommentare deaktiviert für JMS Receiver / JMS Consumer with Selector
Nov 142012
 

I had a look for a solution to make request-response pattern work with pre-defined queues and several consumers on the same queue.

On the ActiveMQ web page there is an example on how to use request-response with temporary queues. That means, that each message has its own ReplyTo-Property and this queue which is defined there, is created temporarily.

But I need a solution where the Request and Response queue is predefined and static (not temporarily). There is a solution based on the standard JMS functionality which is called „Selector“. With such an selector you can define a SQL like statement. Only messages that are matching that statement will be retrieved by the receiver. You can use the selector on the JMS Message properties. For more information check the Java API. Check the Interface javax.jms.Session. There you can find the method:

createConsumer(Destination destination, java.lang.String messageSelector);

This is an example statement for a selector:

String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";

That means, that only JMS Messages with JMSType  „philipp“ and a correlation ID which is bigger than 12345 are received by the consumer.

Here you can see the whole class:

/** Philipp Boss
 * 04.07.2012
 */
package com.pbo.jms;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 * @author Philipp Boss
 * 
 */
public class JmsReceiverWithSelector {

	// Server where activeMQ is running
	static String server = "localhost";

	// that is the JavaNaming and Directory Interface --> service to find names objects in java
	static Context jndiContext = null;

	// this factory creates the connection to the queues
	static ConnectionFactory connectionFactory = null;

	// that is the connection we use for connecting to the queues
	static Connection connection = null;

	// the JMS Session
	static Session session = null;

	// the queue-name from which the message should be read
	static String queueName = "testQueue";

	// that is the destination from which we want to read the messages
	static Destination destinationQueue = null;

	// that is the Consumer that receives the messages
	static MessageConsumer messageConsumer = null;

	/**
	 * @param args
	 * @throws JMSException
	 */
	public static void main(String[] args) throws JMSException {

		// Create a JNDI API InitialContext object
		Properties props = new Properties();
		props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
		props.setProperty(Context.PROVIDER_URL, "tcp://" + server + ":61616");

		// create connection and get the message
		try {
			// get the context from the ActiveMQ Broker
			jndiContext = new InitialContext(props); 
			connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
			destinationQueue = (Destination) jndiContext.lookup("dynamicQueues/" + queueName);

			// Connect to ActiveMQ
			connection = connectionFactory.createConnection();
			connection.start();
			// in the session you can set password, transaction-Mode, etc.
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

			// this consumer listens to the destination queue with a specific filter:
			String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";
			messageConsumer = session.createConsumer(destinationQueue, jmsSelector);

		} catch (NamingException e) {
			System.out.println("Could not create JNDI API context or JNDI API lookup failed: " + e.toString());
			System.exit(1);
		}

		QueueBrowser myEventBrowser = session.createBrowser((Queue) destinationQueue);
		System.out.println("EventQueueBrowser:  has Messages waiting: " + myEventBrowser.getEnumeration().hasMoreElements());

		if (myEventBrowser.getEnumeration().hasMoreElements() == false) {
			// Error Case
			System.out.println("No message is waiting in Event-Queue. I will not do anything.");
			return;
		}
		TextMessage myTxtMessage = (TextMessage) messageConsumer.receive();

		// now we have the text-message.
		// we can extract the standard JMS Header
		System.out.println("Standard JMS Field: Correlation ID = " + myTxtMessage.getJMSCorrelationID());
		System.out.println("Standard JMS Field: JMSMessageID = " + myTxtMessage.getJMSMessageID());
		System.out.println("Standard JMS Field: JMSType = " + myTxtMessage.getJMSType());

		// now we can process the text inside the message
		String msgContent = myTxtMessage.getText();
		System.out.println("Content of Message = " + msgContent);

		System.exit(0);
	}

}

 

 

 

 

JMS Sender Example – ActiveMQ

 IT, Java  Kommentare deaktiviert für JMS Sender Example – ActiveMQ
Nov 142012
 

Here is a very small example for an JMS Message Sender:

I will not separately explain it, please check the comments in the code:

 

/** @author Philipp Boss
 * 04.07.2012
 */
package com.pbo.jms;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 * @author Philipp Boss
 * 
 */
public class JmsSender {

	// Server where activeMQ is running
	static String server = "localhost";

	// that is the JavaNaming and Directory Interface --> service to find names objects in java
	static Context jndiContext = null;

	// this factory creates the connection to the queues
	static ConnectionFactory connectionFactory = null;

	// that is the connection we use for connecting to the queues
	static Connection connection = null;

	// the JMS Session
	static Session session = null;

	// the queue-name to which the message is send
	static String queueName = "testQueue";

	// that is the destination from which we want to read the messages
	static Destination destinationQueue = null;

	// that is the Consumer that receives the messages
	static MessageProducer messageProducer = null;

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		System.out.println("Destination: " + queueName);

		// Create a JNDI API InitialContext object
		Properties props = new Properties();
		props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
		props.setProperty(Context.PROVIDER_URL, "tcp://localhost:61616");

		try {
			jndiContext = new InitialContext(props);
			connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
			destinationQueue = (Destination) jndiContext.lookup("dynamicQueues/" + queueName);
		} catch (NamingException e) {
			System.out.println("Could not create JNDI API context or JNDI API lookup failed: " + e.toString());
			System.exit(1);
		}

		// Content of the JMSTextMessage
		String messageContent = "This is the content of the message. you can put here XML, or any other text...";

		try {
			connection = connectionFactory.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			messageProducer = session.createProducer(destinationQueue);
			TextMessage message = session.createTextMessage();

			// set Standard JMS Header Fields
			message.setJMSDeliveryMode(javax.jms.DeliveryMode.PERSISTENT);
			message.setJMSType("TextMessage");
			message.setJMSCorrelationID("12345678");
			// In activeMQ the JMS Message ID is overwritten by activeMQ...I don't know why....
			message.setJMSMessageID("ThisIsMyMsgID");

			// Custom JMS Message properties
			message.setStringProperty("MyExternalUser", "external_philipp");
			message.setIntProperty("MY_INT_NUMBER", 10);

			// set the text to the message
			message.setText(messageContent);
			System.out.println("Sending message: n" + message.getText());
			messageProducer.send(message);
			System.out.println("Message successfully sent");

		} catch (JMSException e) {
			System.out.println("Exception occurred: " + e);
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
				}
			}
		}
		System.exit(0);
	}

}