I had a look for a solution to make request-response pattern work with pre-defined queues and several consumers on the same queue.
On the ActiveMQ web page there is an example on how to use request-response with temporary queues. That means, that each message has its own ReplyTo-Property and this queue which is defined there, is created temporarily.
But I need a solution where the Request and Response queue is predefined and static (not temporarily). There is a solution based on the standard JMS functionality which is called „Selector“. With such an selector you can define a SQL like statement. Only messages that are matching that statement will be retrieved by the receiver. You can use the selector on the JMS Message properties. For more information check the Java API. Check the Interface javax.jms.Session. There you can find the method:
createConsumer(Destination destination, java.lang.String messageSelector);
This is an example statement for a selector:
String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";
That means, that only JMS Messages with JMSType „philipp“ and a correlation ID which is bigger than 12345 are received by the consumer.
Here you can see the whole class:
/** Philipp Boss * 04.07.2012 */ package com.pbo.jms; import java.util.Properties; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; /** * @author Philipp Boss * */ public class JmsReceiverWithSelector { // Server where activeMQ is running static String server = "localhost"; // that is the JavaNaming and Directory Interface --> service to find names objects in java static Context jndiContext = null; // this factory creates the connection to the queues static ConnectionFactory connectionFactory = null; // that is the connection we use for connecting to the queues static Connection connection = null; // the JMS Session static Session session = null; // the queue-name from which the message should be read static String queueName = "testQueue"; // that is the destination from which we want to read the messages static Destination destinationQueue = null; // that is the Consumer that receives the messages static MessageConsumer messageConsumer = null; /** * @param args * @throws JMSException */ public static void main(String[] args) throws JMSException { // Create a JNDI API InitialContext object Properties props = new Properties(); props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); props.setProperty(Context.PROVIDER_URL, "tcp://" + server + ":61616"); // create connection and get the message try { // get the context from the ActiveMQ Broker jndiContext = new InitialContext(props); connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory"); destinationQueue = (Destination) jndiContext.lookup("dynamicQueues/" + queueName); // Connect to ActiveMQ connection = connectionFactory.createConnection(); connection.start(); // in the session you can set password, transaction-Mode, etc. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // this consumer listens to the destination queue with a specific filter: String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345"; messageConsumer = session.createConsumer(destinationQueue, jmsSelector); } catch (NamingException e) { System.out.println("Could not create JNDI API context or JNDI API lookup failed: " + e.toString()); System.exit(1); } QueueBrowser myEventBrowser = session.createBrowser((Queue) destinationQueue); System.out.println("EventQueueBrowser: has Messages waiting: " + myEventBrowser.getEnumeration().hasMoreElements()); if (myEventBrowser.getEnumeration().hasMoreElements() == false) { // Error Case System.out.println("No message is waiting in Event-Queue. I will not do anything."); return; } TextMessage myTxtMessage = (TextMessage) messageConsumer.receive(); // now we have the text-message. // we can extract the standard JMS Header System.out.println("Standard JMS Field: Correlation ID = " + myTxtMessage.getJMSCorrelationID()); System.out.println("Standard JMS Field: JMSMessageID = " + myTxtMessage.getJMSMessageID()); System.out.println("Standard JMS Field: JMSType = " + myTxtMessage.getJMSType()); // now we can process the text inside the message String msgContent = myTxtMessage.getText(); System.out.println("Content of Message = " + msgContent); System.exit(0); } }