Nov 142012

I had a look for a solution to make request-response pattern work with pre-defined queues and several consumers on the same queue.

On the ActiveMQ web page there is an example on how to use request-response with temporary queues. That means, that each message has its own ReplyTo-Property and this queue which is defined there, is created temporarily.

But I need a solution where the Request and Response queue is predefined and static (not temporarily). There is a solution based on the standard JMS functionality which is called „Selector“. With such an selector you can define a SQL like statement. Only messages that are matching that statement will be retrieved by the receiver. You can use the selector on the JMS Message properties. For more information check the Java API. Check the Interface javax.jms.Session. There you can find the method:

createConsumer(Destination destination, java.lang.String messageSelector);

This is an example statement for a selector:

String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";

That means, that only JMS Messages with JMSType  „philipp“ and a correlation ID which is bigger than 12345 are received by the consumer.

Here you can see the whole class:

/** Philipp Boss
 * 04.07.2012
package com.pbo.jms;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

 * @author Philipp Boss
public class JmsReceiverWithSelector {

	// Server where activeMQ is running
	static String server = "localhost";

	// that is the JavaNaming and Directory Interface --> service to find names objects in java
	static Context jndiContext = null;

	// this factory creates the connection to the queues
	static ConnectionFactory connectionFactory = null;

	// that is the connection we use for connecting to the queues
	static Connection connection = null;

	// the JMS Session
	static Session session = null;

	// the queue-name from which the message should be read
	static String queueName = "testQueue";

	// that is the destination from which we want to read the messages
	static Destination destinationQueue = null;

	// that is the Consumer that receives the messages
	static MessageConsumer messageConsumer = null;

	 * @param args
	 * @throws JMSException
	public static void main(String[] args) throws JMSException {

		// Create a JNDI API InitialContext object
		Properties props = new Properties();
		props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
		props.setProperty(Context.PROVIDER_URL, "tcp://" + server + ":61616");

		// create connection and get the message
		try {
			// get the context from the ActiveMQ Broker
			jndiContext = new InitialContext(props); 
			connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
			destinationQueue = (Destination) jndiContext.lookup("dynamicQueues/" + queueName);

			// Connect to ActiveMQ
			connection = connectionFactory.createConnection();
			// in the session you can set password, transaction-Mode, etc.
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

			// this consumer listens to the destination queue with a specific filter:
			String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";
			messageConsumer = session.createConsumer(destinationQueue, jmsSelector);

		} catch (NamingException e) {
			System.out.println("Could not create JNDI API context or JNDI API lookup failed: " + e.toString());

		QueueBrowser myEventBrowser = session.createBrowser((Queue) destinationQueue);
		System.out.println("EventQueueBrowser:  has Messages waiting: " + myEventBrowser.getEnumeration().hasMoreElements());

		if (myEventBrowser.getEnumeration().hasMoreElements() == false) {
			// Error Case
			System.out.println("No message is waiting in Event-Queue. I will not do anything.");
		TextMessage myTxtMessage = (TextMessage) messageConsumer.receive();

		// now we have the text-message.
		// we can extract the standard JMS Header
		System.out.println("Standard JMS Field: Correlation ID = " + myTxtMessage.getJMSCorrelationID());
		System.out.println("Standard JMS Field: JMSMessageID = " + myTxtMessage.getJMSMessageID());
		System.out.println("Standard JMS Field: JMSType = " + myTxtMessage.getJMSType());

		// now we can process the text inside the message
		String msgContent = myTxtMessage.getText();
		System.out.println("Content of Message = " + msgContent);







Sorry, the comment form is closed at this time.