ActiveMQ asynchronous patterns

In previous post, I made a simple ActiveMQ (Paper-Rock-Scissors-game).

Now we are going tune it up a bit by implementing several small design improvements:

  • Implement Return Address pattern
  • Implement Correlation Identifier pattern
  • Transform the consumer to asynchronous by replacing the method receive() with implementation of the interface MessageListener


At first all those terms maybe sounds complicated but on practice we can implement them by adding just several lines of code.

Source code: paper-rock-scissors-project-2

Implementing Return Address pattern
This pattern simply represents adding a return address to our message, in order our consumer to know where exactly he should send back the response. In ActiveMQ and Java we achieve that by calling setJMSReplyTo(queue_name) on message object. The bold line from the code below is an example from Player_1 (our Producer) class.


MapMessage message = session.createMapMessage();
int guess =  getRandomGuess();
message.setInt("Guess", guess);
message.setJMSReplyTo(player1Queue);
producer.send(message);			

In Player_2 (our Consumer) class we want to use that Return Address to send the response back to our Producer:


Message inMessage = player2Consumer.receive();
MapMessage message;
...               
message = (MapMessage) inMessage;
...
Destination replyDestination = message.getJMSReplyTo();
MessageProducer replyProducer = session.createProducer(replyDestination);
...
replyProducer.send(replyMessage);

Implementing Correlation Identifier pattern

Initially every message has its own message id. Sometimes though, we need to trace back also from which request particular response origins. In those cases we set correlation id to our message object. In ActiveMQ and Java there is simple way to do that. The bold line from the code down below is a representation of this implementation in the Player_2 class.


MapMessage replyMessage = session.createMapMessage();
replyMessage.setInt("Guess", getRandomGuess());
replyMessage.setJMSCorrelationID(message.getJMSMessageID());
replyProducer.send(replyMessage);

Asynchronous message receive

The asynchronous receiving of messages is simply achieved by replacing of


MapMessage reply = (MapMessage) replyConsumer.receive();

with implementation of the interface MessageListener and it’s method onMessage(Message message)


MapMessage reply = (MapMessage) replyConsumer.receive();

public class Player_2 extends Player implements Runnable, MessageListener {
...

public void onMessage(Message message) {
    try {
        if ((message instanceof MapMessage) && (message.getJMSReplyTo() != null)) {
            MapMessage requestMessage = (MapMessage) message;

            Destination replyDestination = requestMessage.getJMSReplyTo();
            MessageProducer replyProducer = session.createProducer(replyDestination);
            MapMessage replyMessage = session.createMapMessage();
            ...
            replyMessage.setJMSCorrelationID(message.getJMSMessageID());

            replyProducer.send(replyMessage);
        }
        else {
            
            message.setJMSCorrelationID(message.getJMSMessageID());
            System.out.println("Invalid message format");
            invalidProducer.send(message);
        
        } 
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

The 3 bold lines in this code example represents a simple Invalid Message Channel implementation. This channel purpose is to receive all invalid messages (e.g. bad message type, missing headers) for better overview and error control.