ActiveMQ and Camel : How to store a Copy of a JMS Message as File

For some testing- and debug scenarios I had the need to store all messages passing the ActiveMQ Broker to the local file system. This enables me to analyse the messages after they were fetched by the consumers. ActiveMQ provides Camel-Support. Camel is a „framework“ for Enterprise Integration Patterns. I struggled a bit with that word, so here just a few capabilities of what camel is and what it supports:

Ok, now how to configure Camel in ActiveMQ:

In the conf/activemq.xml you have to import the camel.xml file:

<import resource="jetty.xml"/>
<import resource="camel.xml"/>

After that, configure the broker in conf/camel.xml. At the end of the file there is a bean id=activemq. Set your broker config there.
I had to change the brokerURL:

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent" >
	<property name="connectionFactory">
	  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
		<property name="userName" value="${activemq.username}"/>
		<property name="password" value="${activemq.password}"/>
	  </bean>
	</property>
</bean>

Now have a look at the sample route in camel.xml:

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
	<!-- You can use Spring XML syntax to define the routes here using the <route> element -->
	<route>
		<description>Example Camel Route</description>
		<from uri="activemq:example.A"/>
		<to uri="activemq:example.B"/>
	</route>
</camelContext>

This route  forwards a message from Queue example.A to example.B. Now add a multicast Route:

<route>
	<from uri="activemq:queue:a"/>
	<multicast>
		<to uri="activemq:queue:b"/>
		<to uri="activemq:queue:c"/>
		<to uri="activemq:queue:d"/>
	</multicast>
</route>

Let’s assume you would like to store Messages as File:

<route>
	<from uri="activemq:queue:a"/>
	<multicast>
		<to uri="activemq:queue:b"/>
		<to uri="activemq:queue:c"/>
		<to uri="activemq:queue:d"/>
		<to uri="file://C:java/apache-activemq-5.8.0/writeMsgToFile?autoCreate=true"/>
	</multicast>
</route>

The <to uri> expression creates a copy of the message as file in the defined directory. The autoCreate=true parameter creates the directory if it is not already there.

Lets assume you would like to set the file name for each message based on Header Parameters or the date when the message is processed:

<route>
	<from uri="activemq:queue:a"/>
	<multicast>
		<to uri="activemq:queue:b"/>
		<to uri="activemq:queue:c"/>
		<to uri="activemq:queue:d"/>
		<to uri="file://C:java/apache-activemq-5.8.0/writeMsgToFile?fileName=${date:now:yyyyMMdd}backup-${in.header.OrderType}-${date:now:hh.mm.ss.SSS}.txt"/>
	</multicast>
</route>

The ${date:now:yyyyMMdd} creates a new directory every day. The expression ${in.header.<CustomProperty>} allows you to use some custom JMS Property. In my case I have a custom property called „OrderType“. So I know by looking the filename which kind of message is stored. The last expression ${date:now:hh.mm.ss.SSS} inserts a timestamd in every filename. This is necessary if it can happen, that severel messages arrive the broker at the same time. If you do add milliseconds (SSS) to the filename, the old file will be overwritten by the new one which arrives the broker at the same second.

That’s a first, simple introduction on camel and ActiveMQ.

Configure ActiveMQ Web console for password secured broker / Queues

In my last post about ActiveMQ I showed how to configure ActiveMQ in order to secure the Queues by Username and Password.

In the last days I noticed, that the web console throws an exception when I try to click on one of the queues. I realized, that the webconsole couldn’t open the queue because it is password protected.

In the logfile there was an exception like this (see the bold words):

2013-08-15 17:09:23,486 | WARN  | Failed to add Connection ID:PBOSS-56722-1376579349017-3:1, reason: java.lang.SecurityException: User name [system] or password is invalid. | org.apache.activemq.broker.TransportConnection | ActiveMQ VMTransport: vm://localhost#1-1
2013-08-15 17:09:23,489 | INFO  | Connector vm://localhost Stopped | org.apache.activemq.broker.TransportConnector | qtp2024674927-37
2013-08-15 17:09:23,490 | WARN  | /admin/browse.jsp | org.eclipse.jetty.servlet.ServletHandler | qtp2024674927-37
org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‚queueBrowser‘ defined in ServletContext resource [/WEB-INF/webconsole-query.xml]: Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Could not instantiate bean class [org.apache.activemq.web.QueueBrowseQuery]: Constructor threw exception; nested exception is javax.jms.JMSException: User name [system] or password is invalid.

Ok, now we see, that the web console seems to use the username „system“ by default. But in my configuration the queues are secured by „superman“ and „boss“. So I have to „tell“ the web console this information. You can define that in the file: <activemq-home>/conf/credential.properties:

#this is the queue User
activemq.username=supermann
#this is the queue password
activemq.password=boss

Now you have to restart MQ and the web console will work fine 🙂

ActiveMQ Simple Authentication for Consumers and Producers

By default, ActiveMQ does not request a Username or password for consumers and producers. That means, everyone who knows your broker URL is able to connect to your broker and read your messages and/or send messages. For personal development this isn’t a big issue, but for business applications security plays an important role.

Because of that, I have played around with the authentication of consumers and producers on ActiveMQ brokers.

First of all you should know, that ActiveMQ supports a users and groups based authentication.In our example we would like to have two groups:

  • senders: they are allowed to send messages into queues (write access)
  • receivers: they are allowed to consume the messages (read access)

Setup groups and users for Receiver and Consumer:

To setup those two groups and one user for each group you have to add these lines (starting with „plugins“)  to your activemq.xml :

ATTENTION:  in earlier versions (I tested it also with 5.5.1) you have to insert the plugins-element in the correct alphabetical order. See also: http://activemq.apache.org/xml-reference.html –> Alphabetically ordered xml elements (5.4 – 5.5.1)

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

      <plugins>
        <simpleAuthenticationPlugin>
            <users>
                <authenticationUser username="superman" password="boss" groups="senders,receivers,admins" />
                <authenticationUser username="mySendingApp" password="manager" groups="senders" />
                <authenticationUser username="myReceivingApp" password="manager" groups="receivers" />
            </users>
        </simpleAuthenticationPlugin>
      </plugins>

Link groups and users to queues and topics

After that you need to grant rights to the created groups „senders“ , „receivers“ and „admins“. Add the following lines after the closing </simpleAuthenticationPlugin> tag:

<authorizationPlugin>
   <map>
       <authorizationMap>
           <authorizationEntries>
               <authorizationEntry queue=">" write="senders" read="receivers" admin="admins" />
	       <authorizationEntry topic="ActiveMQ.Advisory.>" write="senders" read="receivers" admin="admins,senders,receivers" />
           </authorizationEntries>
       </authorizationMap>
   </map>
</authorizationPlugin>

Explanation: You can specify different access rights for the single queues. In my example I allow all users from group „senders“ to write inside all queues. queue=“>“ stands for all queues. ActiveMQ uses the „>“ as Wildcard (see also the website-link below). Also I allow all receivers to read from the queue.

Maybe you are wondering about the second <authorizationEntry> . ActiveMQ-Clients creating Advisory-Topics for several reasons. For each queue a client connects to, the client tries to create a Advisory-Topic. That is the reason why I added the second line. This line defines that all clients (with correct password and username) are able to create Topics that are named „ActiveMQ.Advisory.*“.  More about ActiveMQs advisory stuff see website link below.

Now you can restart the ActiveMQ broker in order to read the new configuration.

Client side: connect to a username/password protected queue

Now we have to setup the client side to connect to a secured broker. The ConnectionFactory class provides a method called „createConnection“. There is an overloaded method of this: createConnection(String arg1 , String arg2) where arg1 is the username and arg2 is the password.

Here a short example for a listener connecting to a secured broker:

public void startListener() throws NamingException, JMSException, InterruptedException {

	// Create a JNDI API InitialContext object
	Properties props = new Properties();
	props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
	props.setProperty(Context.PROVIDER_URL, "tcp://" + server + ":61616");

	jndiContext = new InitialContext(props);
	connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
	destination = (Destination) jndiContext.lookup("dynamicQueues/" + queueName);

	// Connect to ActiveMQ
	connection = connectionFactory.createConnection(username, password);
	connection.setClientID(clientID);
	// this helps to ensure, that not 2 instances can connect to the broker simultaneously
	// because it is not allowed to connect to the same broker with the same clientID
	connection.start();
	session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

	MessageConsumer subscriber = session.createConsumer(destination);
	subscriber.setMessageListener(new MessageListener() {

		@Override
		public void onMessage(Message msg) {
			try {
				System.out.println("Message arrived by consumer-ID : " + clientID + " CONTENT = " + ((TextMessage) msg).getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	});

	for (int i = 0; i &lt; 100; i++) {
		System.out.println("still alive .... " + i);
		Thread.sleep(10 * 1000);
	}
}

 

Have fun with simple secured JMS 😉

——————————————————————

Sources / Websites:

ActiveMQ Wildcards: http://activemq.apache.org/wildcards.html

ActiveMQ Sample Borker Config for secured JMS Connections: http://activemq.apache.org/complex-single-broker-configuration-stomp-only.html

ActiveMQ Advisory Topics: http://activemq.apache.org/advisory-message.html

Java API for ConnectionFactory:  http://docs.oracle.com/javaee/1.4/api/javax/jms/ConnectionFactory.html

 

JMS: How to block several connections from one client to one broker

Hi,

I had the problem, that it should not be allowed to run 2 instances of the client simultaneously with the same broker.

I fixed that issue by setting the JMS Property ClientID.
When connecting to the broker, you can set a clientID. This ID is then registered on the Broker. If you now run that tool again (while the first instance is still running), then the second instance tries to connect, but the broker does not allow a second instance with the same clientID.

 connection.setClientID("Philipps_Tool");

Code Example:

// Create a JNDI API InitialContext object
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL, server);

jndiContext = new InitialContext(props);
connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
requestQueue = (Destination) jndiContext.lookup("dynamicQueues/" + requestQueueName);
responseQueue = (Destination) jndiContext.lookup("dynamicQueues/"+ responseQueueName);

//Connect to  ActiveMQ
connection = connectionFactory.createConnection();
connection.setClientID("Philipps_Tool");   
// this helps to ensure, that not 2 instances can connect to the broker simultaneously
// because it is not allowed to connect to the same broker with the same clientID
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

JMS Receiver / JMS Consumer with Selector

I had a look for a solution to make request-response pattern work with pre-defined queues and several consumers on the same queue.

On the ActiveMQ web page there is an example on how to use request-response with temporary queues. That means, that each message has its own ReplyTo-Property and this queue which is defined there, is created temporarily.

But I need a solution where the Request and Response queue is predefined and static (not temporarily). There is a solution based on the standard JMS functionality which is called „Selector“. With such an selector you can define a SQL like statement. Only messages that are matching that statement will be retrieved by the receiver. You can use the selector on the JMS Message properties. For more information check the Java API. Check the Interface javax.jms.Session. There you can find the method:

createConsumer(Destination destination, java.lang.String messageSelector);

This is an example statement for a selector:

String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";

That means, that only JMS Messages with JMSType  „philipp“ and a correlation ID which is bigger than 12345 are received by the consumer.

Here you can see the whole class:

/** Philipp Boss
 * 04.07.2012
 */
package com.pbo.jms;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 * @author Philipp Boss
 * 
 */
public class JmsReceiverWithSelector {

	// Server where activeMQ is running
	static String server = "localhost";

	// that is the JavaNaming and Directory Interface --> service to find names objects in java
	static Context jndiContext = null;

	// this factory creates the connection to the queues
	static ConnectionFactory connectionFactory = null;

	// that is the connection we use for connecting to the queues
	static Connection connection = null;

	// the JMS Session
	static Session session = null;

	// the queue-name from which the message should be read
	static String queueName = "testQueue";

	// that is the destination from which we want to read the messages
	static Destination destinationQueue = null;

	// that is the Consumer that receives the messages
	static MessageConsumer messageConsumer = null;

	/**
	 * @param args
	 * @throws JMSException
	 */
	public static void main(String[] args) throws JMSException {

		// Create a JNDI API InitialContext object
		Properties props = new Properties();
		props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
		props.setProperty(Context.PROVIDER_URL, "tcp://" + server + ":61616");

		// create connection and get the message
		try {
			// get the context from the ActiveMQ Broker
			jndiContext = new InitialContext(props); 
			connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
			destinationQueue = (Destination) jndiContext.lookup("dynamicQueues/" + queueName);

			// Connect to ActiveMQ
			connection = connectionFactory.createConnection();
			connection.start();
			// in the session you can set password, transaction-Mode, etc.
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

			// this consumer listens to the destination queue with a specific filter:
			String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";
			messageConsumer = session.createConsumer(destinationQueue, jmsSelector);

		} catch (NamingException e) {
			System.out.println("Could not create JNDI API context or JNDI API lookup failed: " + e.toString());
			System.exit(1);
		}

		QueueBrowser myEventBrowser = session.createBrowser((Queue) destinationQueue);
		System.out.println("EventQueueBrowser:  has Messages waiting: " + myEventBrowser.getEnumeration().hasMoreElements());

		if (myEventBrowser.getEnumeration().hasMoreElements() == false) {
			// Error Case
			System.out.println("No message is waiting in Event-Queue. I will not do anything.");
			return;
		}
		TextMessage myTxtMessage = (TextMessage) messageConsumer.receive();

		// now we have the text-message.
		// we can extract the standard JMS Header
		System.out.println("Standard JMS Field: Correlation ID = " + myTxtMessage.getJMSCorrelationID());
		System.out.println("Standard JMS Field: JMSMessageID = " + myTxtMessage.getJMSMessageID());
		System.out.println("Standard JMS Field: JMSType = " + myTxtMessage.getJMSType());

		// now we can process the text inside the message
		String msgContent = myTxtMessage.getText();
		System.out.println("Content of Message = " + msgContent);

		System.exit(0);
	}

}