Saturday, November 30, 2013

How error handling works in Spring Integration

1   Introduction


The target of this post is to show you how errors are handled when using the messaging system with Spring Integration. You will see that error handling is different between synchronous and asynchronous messaging. As usual, I'll skip the chat and proceed with some examples.

You can get the source code at github.


2   The sample application


I will use a basic example, since I want to focus on exception handling. The application consists in an order service, which receives an order, processes it and returns a confirmation.

Below we can see how the messaging system is configured:

int-config.xml
The gateway is the entry point of the messaging system. It will receive the order and send it to the direct channel "requestChannel" where a router will redirect it to the appropriate channel based on the order id:

  • syncChannel: A direct channel that will send the order to an order processor subscribed to this channel.
  • asyncChannel: A queue channel from which the order processor will actively retrieve the order.

Once the order is processed, an order confirmation will be sent back to the gateway. Here is a graphic representing this:

Ok, let's start with the simplest case, synchronous sending using a Direct Channel.


3   Synchronous sending with Direct channel


The order processor is subscribed to the "syncChannel" Direct Channel. The "processOrder" method will be invoked in the sender's thread.

Now, we will implement a test that will provoke an exception by sending an invalid order. This test will send an order to the gateway:


The test:

TestSyncErrorHandling.java

We run the test and see how an exception is raised in the order processor and reaches the test. That's fine; we wanted to validate that sending an invalid order raised an exception. This happened because the test sent the order and blocked waiting for the order to be processed in the same thread. But, what happens when we use an asynchronous channel? Let's continue to the next section.


4   Asynchronous sending with Queue Channel


This section's test sends an order that will be redirected by the router to the queue channel. The gateway is shown below:

Notice that this time the gateway is returning a Future. If we didn't return this, the gateway would block the test thread. By returning Future, the gateway becomes asynchronous and doesn't block the sender's thread.

The test:

TestKoAsyncErrorHandling.java
Ok, so now we are going to launch the test and see the exception raising...

java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException

Oops the test failed because no exception reached the test! What happened? Well, the explanation is below:

Since we are using an asynchronous channel (a queue), the sender sends the order and moves on. Then, the receiver polls the order from a different thread. For this reason, it won't be possible to throw the Exception back to the sender. Let's act like nothing happened then? Well you better not, there are other options.


5   Asynchronous error handling


When using asynchronous messaging, Spring Integration handles exceptions by publishing them to message channels. The exception thrown will be wrapped into a MessagingException and become the payload of the message.

What channel is the error message sent to? First, it will check if the request message contains a header called "errorChannel". If found, the error message will be sent there. Otherwise, the message will be sent to a so-called global error channel.


5.1   Global error channel


By default, a global error channel called "errorChannel" is created by Spring Integration. This channel is a publish-subscribe channel. This means we can subscribe several endpoints to this channel. In fact, there's already an endpoint subscribed to it: a logging handler.This handler will log the payload of messages arriving to the channel, though it can be configured to behave differently.

We will now subscribe a new handler to this global channel and test that it receives the exception message by storing it into a database.

First of all, we will need to change a few things in our configuration. I've created a new file so it doesn't interfere with our previous tests:

int-async-config.xml
The gateway: I've added an error channel. If the invocation fails, the error message will be sent to this channel. If I hadn't defined an error channel, the gateway would have propagated the exception to the caller, but in this case it wouldn't have worked since this is an asynchronous gateway.

The error handler: I've defined a new endpoint that is subscribed to the global error channel. Now, any error message sent to the global error channel will be delivered to our handler.

I've also added a configuration file in order to configure the database. Our error handler will insert received errors to this database:

db-config.xml
The error handler is pretty simple; it receives the error message and inserts its information to the database:

Ok, now is all set. Let's implement a new test:

TestOkAsyncErrorHandlingTest.java
This time the test is successful, the error message has been stored to the database.


5.2   Other mechanisms


Custom error channel: You can define your error channel and define it as a queue channel instead of the default publish-subscribe channel:


ErrorMessageExceptionTypeRouter: This Spring Integration specialized router will resolve the channel where the error message will be sent. It bases its decision on the most specific cause of the error:



6   Conclusion


We have learnt what are the different mechanisms for error handling when using Spring Integration. With this base, you will be able to extend it and configure your error handling by implementing transformers to extract the information from the error message, using header enrichers for setting error channel or implementing your own router among other things.

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.

6 comments:

  1. Very well written.

    Wanted to point out one typo in the code, lest it cause confusion for others should they implement this locally and change values. Line 39 of TestOkAsyncErrorHandlingTest.java should probably read Assert.assertEquals(orderId, result.get("orderid"));

    Once again very nice post, and thanks for the clarity.

    ReplyDelete
    Replies
    1. Hi Jeff,

      Thank you for your feedback and I appreciate you took the time to let me know about the typo. It has been corrected.

      Delete
  2. Loved this Post.
    I have a question on Spring Integration.
    (1) Receive message from Queue-> Call insert stored proc that does not return any result using stored proc Gateway -> Continue the flow to maybe send a message to MQ or do other stuff.
    Issue here is gateway does not return any reply in case of a insert stored procedure and flow stops at the gateway.
    How can I achive this?
    How can I achec

    ReplyDelete
    Replies
    1. I'm sorry but I do not entirely understand your explanation. What are you trying to accomplish?

      Delete
    2. All I am trying to achieve is save an incoming message to the database using a stored proc outbound gateway and after a succesful insert send a message out to another system via JMS in the same flow.
      I need to capture the return code of my stored procedure.
      Thanks in advance for your reply.

      Delete
    3. Although can't help you too much with stored procedures, I have created an application that uses Spring Integration to call a stored procedure and receive a response. You can review it here: https://github.com/xpadro/spring-integration/tree/master/int-jdbc-stored

      Hope it helps

      Delete