JMS: How to block several connections from one client to one broker

 IT, Java  Kommentare deaktiviert für JMS: How to block several connections from one client to one broker
Nov 142012
 

Hi,

I had the problem, that it should not be allowed to run 2 instances of the client simultaneously with the same broker.

I fixed that issue by setting the JMS Property ClientID.
When connecting to the broker, you can set a clientID. This ID is then registered on the Broker. If you now run that tool again (while the first instance is still running), then the second instance tries to connect, but the broker does not allow a second instance with the same clientID.

 connection.setClientID("Philipps_Tool");

Code Example:

// Create a JNDI API InitialContext object
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL, server);

jndiContext = new InitialContext(props);
connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
requestQueue = (Destination) jndiContext.lookup("dynamicQueues/" + requestQueueName);
responseQueue = (Destination) jndiContext.lookup("dynamicQueues/"+ responseQueueName);

//Connect to  ActiveMQ
connection = connectionFactory.createConnection();
connection.setClientID("Philipps_Tool");   
// this helps to ensure, that not 2 instances can connect to the broker simultaneously
// because it is not allowed to connect to the same broker with the same clientID
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

JMS Receiver / JMS Consumer with Selector

 IT, Java  Kommentare deaktiviert für JMS Receiver / JMS Consumer with Selector
Nov 142012
 

I had a look for a solution to make request-response pattern work with pre-defined queues and several consumers on the same queue.

On the ActiveMQ web page there is an example on how to use request-response with temporary queues. That means, that each message has its own ReplyTo-Property and this queue which is defined there, is created temporarily.

But I need a solution where the Request and Response queue is predefined and static (not temporarily). There is a solution based on the standard JMS functionality which is called „Selector“. With such an selector you can define a SQL like statement. Only messages that are matching that statement will be retrieved by the receiver. You can use the selector on the JMS Message properties. For more information check the Java API. Check the Interface javax.jms.Session. There you can find the method:

createConsumer(Destination destination, java.lang.String messageSelector);

This is an example statement for a selector:

String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";

That means, that only JMS Messages with JMSType  „philipp“ and a correlation ID which is bigger than 12345 are received by the consumer.

Here you can see the whole class:

/** Philipp Boss
 * 04.07.2012
 */
package com.pbo.jms;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 * @author Philipp Boss
 * 
 */
public class JmsReceiverWithSelector {

	// Server where activeMQ is running
	static String server = "localhost";

	// that is the JavaNaming and Directory Interface --> service to find names objects in java
	static Context jndiContext = null;

	// this factory creates the connection to the queues
	static ConnectionFactory connectionFactory = null;

	// that is the connection we use for connecting to the queues
	static Connection connection = null;

	// the JMS Session
	static Session session = null;

	// the queue-name from which the message should be read
	static String queueName = "testQueue";

	// that is the destination from which we want to read the messages
	static Destination destinationQueue = null;

	// that is the Consumer that receives the messages
	static MessageConsumer messageConsumer = null;

	/**
	 * @param args
	 * @throws JMSException
	 */
	public static void main(String[] args) throws JMSException {

		// Create a JNDI API InitialContext object
		Properties props = new Properties();
		props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
		props.setProperty(Context.PROVIDER_URL, "tcp://" + server + ":61616");

		// create connection and get the message
		try {
			// get the context from the ActiveMQ Broker
			jndiContext = new InitialContext(props); 
			connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
			destinationQueue = (Destination) jndiContext.lookup("dynamicQueues/" + queueName);

			// Connect to ActiveMQ
			connection = connectionFactory.createConnection();
			connection.start();
			// in the session you can set password, transaction-Mode, etc.
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

			// this consumer listens to the destination queue with a specific filter:
			String jmsSelector = "JMSType = 'philipp' AND JMSCorrelationID > 12345";
			messageConsumer = session.createConsumer(destinationQueue, jmsSelector);

		} catch (NamingException e) {
			System.out.println("Could not create JNDI API context or JNDI API lookup failed: " + e.toString());
			System.exit(1);
		}

		QueueBrowser myEventBrowser = session.createBrowser((Queue) destinationQueue);
		System.out.println("EventQueueBrowser:  has Messages waiting: " + myEventBrowser.getEnumeration().hasMoreElements());

		if (myEventBrowser.getEnumeration().hasMoreElements() == false) {
			// Error Case
			System.out.println("No message is waiting in Event-Queue. I will not do anything.");
			return;
		}
		TextMessage myTxtMessage = (TextMessage) messageConsumer.receive();

		// now we have the text-message.
		// we can extract the standard JMS Header
		System.out.println("Standard JMS Field: Correlation ID = " + myTxtMessage.getJMSCorrelationID());
		System.out.println("Standard JMS Field: JMSMessageID = " + myTxtMessage.getJMSMessageID());
		System.out.println("Standard JMS Field: JMSType = " + myTxtMessage.getJMSType());

		// now we can process the text inside the message
		String msgContent = myTxtMessage.getText();
		System.out.println("Content of Message = " + msgContent);

		System.exit(0);
	}

}

 

 

 

 

JMS Sender Example – ActiveMQ

 IT, Java  Kommentare deaktiviert für JMS Sender Example – ActiveMQ
Nov 142012
 

Here is a very small example for an JMS Message Sender:

I will not separately explain it, please check the comments in the code:

 

/** @author Philipp Boss
 * 04.07.2012
 */
package com.pbo.jms;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 * @author Philipp Boss
 * 
 */
public class JmsSender {

	// Server where activeMQ is running
	static String server = "localhost";

	// that is the JavaNaming and Directory Interface --> service to find names objects in java
	static Context jndiContext = null;

	// this factory creates the connection to the queues
	static ConnectionFactory connectionFactory = null;

	// that is the connection we use for connecting to the queues
	static Connection connection = null;

	// the JMS Session
	static Session session = null;

	// the queue-name to which the message is send
	static String queueName = "testQueue";

	// that is the destination from which we want to read the messages
	static Destination destinationQueue = null;

	// that is the Consumer that receives the messages
	static MessageProducer messageProducer = null;

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		System.out.println("Destination: " + queueName);

		// Create a JNDI API InitialContext object
		Properties props = new Properties();
		props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
		props.setProperty(Context.PROVIDER_URL, "tcp://localhost:61616");

		try {
			jndiContext = new InitialContext(props);
			connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
			destinationQueue = (Destination) jndiContext.lookup("dynamicQueues/" + queueName);
		} catch (NamingException e) {
			System.out.println("Could not create JNDI API context or JNDI API lookup failed: " + e.toString());
			System.exit(1);
		}

		// Content of the JMSTextMessage
		String messageContent = "This is the content of the message. you can put here XML, or any other text...";

		try {
			connection = connectionFactory.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			messageProducer = session.createProducer(destinationQueue);
			TextMessage message = session.createTextMessage();

			// set Standard JMS Header Fields
			message.setJMSDeliveryMode(javax.jms.DeliveryMode.PERSISTENT);
			message.setJMSType("TextMessage");
			message.setJMSCorrelationID("12345678");
			// In activeMQ the JMS Message ID is overwritten by activeMQ...I don't know why....
			message.setJMSMessageID("ThisIsMyMsgID");

			// Custom JMS Message properties
			message.setStringProperty("MyExternalUser", "external_philipp");
			message.setIntProperty("MY_INT_NUMBER", 10);

			// set the text to the message
			message.setText(messageContent);
			System.out.println("Sending message: n" + message.getText());
			messageProducer.send(message);
			System.out.println("Message successfully sent");

		} catch (JMSException e) {
			System.out.println("Exception occurred: " + e);
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
				}
			}
		}
		System.exit(0);
	}

}