Tuesday, August 27, 2013

Spring JMS: Processing messages within transactions

1 Introduction

This post will show you how an error in the execution of the consumer during the asynchronous reception of messages with JMS, can lead to the loss of messages. I then will explain how you can solve this problem using local transactions.

You will also see that this solution can cause in some cases, message duplication (for example, when it saves the message to the database and then the listener execution fails). The reason why this happens is because the JMS transaction is independent to other transactional resources like the DB. If your processing is not idempotent or if your application does not support duplicate message detection, then you will have to use distributed transactions.

Distributed transactions are beyond the scope of this post. If you are interested in handling distributed transactions, you can read this interesting article.

I've implemented a test application that reproduces the following cases:

  1. Sending and reception of a message: The consumer will process the received message, storing it to a database.
  2. The producer sends the message to a queue:

    The consumer retrieves the message from the queue and processes it:

  3. Error occurred before message processing: The consumer retrieves the message but the execution fails before storing it to the DB.
  4. Error occurred after processing the message: The consumer retrieves the message, stores it to the DB, and then the execution fails.

The source code for this application can be found at github.


2   The test application

The test application executes two test classes, TestNotTransactedMessaging and TestTransactedMessaging. These classes will both execute the three cases above described.

Let's see the configuration of the application when it is executed without transactions.

app-config.xml
Application configuration. Basically it checks within the indicated packages to autodetect the application beans: producer and consumer. It also configures the in-memory database where processed notifications will be stored.
notx-jms-config.xml
Configures the JMS infrastructure, which is:
  • Broker connection
  • The JmsTemplate
  • Queue where notifications will be sent
  • The listener container that will send notifications to the listener to process them

The producer simply uses the jmsTemplate to send notifications
The listener is responsible for retrieval of notifications from the queue and stores them to the database.
The checkPreprocessException method will throw a runtime exception when a notification with id=1 arrive. In this way, we will cause an error before storing the message to the DB.

The checkPostprocessException method will throw an exception if a notification with id=2 arrive, thereby causing an error just after storing it to the DB.

The getDeliveryNumber method returns the number of times the message has been sent. This only applies within transactions, since the broker will try to resend the message after listener processing failure led to a rollback.

Finally, the saveToDB method is pretty obvious. It stores a notification to the DB.

You can always check the source code of this application by clicking the link at the beginning of this article.


3   Testing message reception without transactions

I will launch two test classes, one without transactions and the other within a local transaction. Both classes extend a base class that loads the common application context and contains some utility methods:
The utility methods are explained below:

  • getSavedNotifications: Returns the number of notifications stored to the DB. I've used the queryForObject method because it is the recommended since version 3.2.2. The queryForInt method has been deprecated.
  • getMessagesInQueue: Allows you to check which messages are still pending in the specified queue. For this test we are interested in knowing how many notifications are still waiting to be processed.

Now, let me show you the code for the first test (TestNotTransactedMessaging). This test launches the 3 cases indicated at the beginning of the article.

4   Executing the test

Ok, let's execute the test and see what the results are:

testCorrectMessage output:
Producer|Sending notification | Id: 0
NotificationProcessor|Received notification | Id: 0 | Redelivery: 1
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 1

No problem here, the queue is empty since the message has been correctly received and stored to the database.

testFailedAfterReceiveMessage output:
Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestNotTransactedMessaging|Total items in "incoming" queue: 0
TestNotTransactedMessaging|Total items in DB: 0

Since it is executing outside a transaction, the acknowledge mode (auto by default) is used. This implies that the message is considered successfully delivered once the onMessage method is invoked and therefore deleted from the queue. Because the listener failed before storing the message to the DB, we have lost the message!!

testFailedAfterProcessingMessage output:
2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2
2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in "incoming" queue: 0
2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1

In this case, the message has been deleted from the queue (AUTO_ACKNOWLEDGE) and stored to the DB before the execution failed.


5   Adding local transactions

Usually we can't allow losing messages like the second case of the test, so what we will do is to invoke the listener within a local transaction. The change is pretty simple and it does not imply modifying a single line of code from our application. We will only need to change the configuration file.

To test the 3 cases with transactions, I will replace the configuration file notx-jms-config.xml for the following:

tx-jms-config.xml
First, I've added the number of re-deliveries made in case of a rollback (caused by an error in the listener execution):
Next, I indicate that the listener will be executed within a transaction. This can be done by modifying the listener container definition:
This will cause every invocation of the listener to be executed within a local JMS transaction. The transaction will start when the message is received. If the listener execution fails, message reception will be rolled back.

And that's all we have to change. Let's launch the tests with this configuration.


6   Testing message reception within transactions

The code from the TestTransactedMessaging class is practically the same as the previous test. The only difference is that it adds a query to the DLQ (dead letter queue). When executed within transactions, if the message reception is rolled back, the broker will send the message to this queue (after all re-deliveries failed).

I'm skipping the output of the successful receiving as it does not bring anything new.

testFailedAfterReceiveMessage output:
Producer|Sending notification | Id: 1
NotificationProcessor|Received notification | Id: 1 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 2
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
...
java.lang.RuntimeException: error after receiving message
NotificationProcessor|Received notification | Id: 1 | Redelivery: 5
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after receiving message
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 1
TestTransactedMessaging|Total items in DB: 0

As you can see, the first receiving has failed, and the broker has tried to resend it four more times (as indicated in the maximumRedeliveries property). Since the situation persisted, the message has been sent to the special DLQ queue. In this way, we do not lose the message.

testFailedAfterProcessingMessage output:
Producer|Sending notification | Id: 2
NotificationProcessor|Received notification | Id: 2 | Redelivery: 1
AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set.
java.lang.RuntimeException: error after processing message
NotificationProcessor|Received notification | Id: 2 | Redelivery: 2
TestTransactedMessaging|Total items in "incoming" queue: 0
TestTransactedMessaging|Total items in "dead letter" queue: 0
TestTransactedMessaging|Total items in DB: 2

In this case, this is what happened:
  1. The listener retrieved the message
  2. It stored the message to the DB
  3. Listener execution failed
  4. The broker resends the message. Since the situation has been solved, the listener stores the message to the DB (again). The message has been duplicated.

7   Conclusion

 Adding local transactions to the message reception avoids losing messages. What we have to take into account is that duplicate messages can occur, so our listener will have to detect it, or our processing will have to be idempotent to process it again without any problem. If this is not possible, we will have to go for distributed transactions, since they support transactions that involve different resources.

8 comments:

  1. How does the messages gets stored into DB? if there is any JVM crash, how the messages that were present in the queue would be persisted? can you help.

    ReplyDelete
    Replies
    1. The NotificationProcessor is responsible of storing messages into the DB (check section 3). Regarding the lose of messages, it depends on the acknowledgement mode.

      Delete
  2. A very, very good article (and likewise the one on basic JMS with Spring). Loved the TDD approach. Thank you.

    ReplyDelete
    Replies
    1. Thank you! I appreciate your kind words, and I'm glad you found this article interesting.

      Delete
  3. Thanks for a great article. Helped me understand local transactions better.

    ReplyDelete
    Replies
    1. Fantastic, If the post helped you then it accomplished its target :)

      Delete
  4. Instead of distributed trasactions (which would be the best way to do it), isn't it possible to have 2 "local" transaccions (1 JMS and 1 JDBC) and make them both rollback in case of failure? (ie adding @Transactional)

    ReplyDelete
    Replies
    1. @Transactional will only be bound to a specified transaction manager, which in case of local transactions will affect a single resource (JMS or JDBC).

      Delete