Cloud-ready messaging with Spring Integration: Taming transactional integration
Developing transactional Java applications nowadays is as easy as applying annotations to your business layer interfaces, with the Spring framework taking care of propagating that to the resources in-use, as needed. This is usually achieved by thread-local Java objects, representing the underlying physical transaction, so the developer can focus on the business logic rather than the scope of the underlying transaction. This is also applicable in situations, where multiple transactional resources are involved. In that case, distributed transactions (XA) should be used, assuming the resources involved support so-called two-phase commits (2PC). The problem, amongst others, is that XA transactions do not scale, which is a no-go when developing applications for the cloud, whose main purpose is unlimited scalability. That’s why cloud providers rarely offer XA support, if at all. In this blog post I will explain, how to avoid this so-called transactional integration leveraged by XA using Spring Integration.
The downside of distributed transactions
Distributed transaction models, such as XA/2PC, are often considered the easiest, if not the only, way to ensure ACID across multiple resources. I was facing this very same issue with one of our customers, where we had to build an application relying on a relational database for data storage and JMS for communicating with other applications. Luckily, both resources turned out to support XA transactions, at first. Thus, I picked Atomikos to deal with that problem, for the sake of not introducing a full-blown JEE server in the first place. It was all too easy: I was able to receive any number of JMS messages from (multiple) queue(s), process them with respect to the data store, and send out new messages without any inconsistencies. Moreover, I wasn’t faced with duplicate message detection or idempotent consumer semantics, since XA deals well with that, as long as the message producing system behaves appropriately.
Why distributed transaction models fail in the cloud
All was fine to the day I found out, that popular cloud providers (Microsoft Azure in my case) do not provide XA transactions, if they provide any transactional resources, at all. I soon stumbled across this post from Clemens Vasters, explaining, why distributed transactions fail in the cloud. His key aspects are:
- Distributed transactions were designed for physical single-node systems, or small clusters of physical nodes.
- Two-Phase Commits (2PC) strongly rely on a high-available central transaction coordinator, both in terms of availability and network connectivity. This can only be achieved in a clustered transaction coordinator setup.
- 2PC models are focused on individual transactions going wrong.
- Distributed transaction coordinators are designed as single-node or clustered systems with strong network topology affinity.
These aspects do not match the very nature of cloud environments, for the following reasons:
- Cloud environments heavily rely on virtual systems, scaling up and down with increasing and decreasing load, respectively. Accordingly, there is no fixed set of physical nodes interacting at all times.
- It is the very nature of a cloud environment that each and every resource may fail at any time, including network connections. This and the fact that global placement strategies may interfere with network latency requirements conflicts with the usual cluster approaches.
- Transactions in the cloud may not only fail due to individual errors, but also due to the failure of complete nodes participating within that same transaction. Moreover, these nodes will most likely not reappear, since they may have been shut down for the sake of elastic down-scaling.
- Cloud environments strongly encourage multi-tenancy sharing physical (or virtualized) resources, which does not match the distributed transaction coordinator single-node approach.
Why distributed transactions leverage transactional integration
Another downside of distributed transaction models, such as XA, is that they leverage transactional integration. This post describes transactional integration as “an anti-pattern where transactions extend across services boundaries (i.e. not isolated inside services)”. Although, the author focuses on business services in a service oriented architecture (SOA), the same holds true for distributed transactions spanning multiple resources, such as the previously mentioned database and JMS queues. Here are some of the problems that may arise as a result of transactional integration:
- Performance degradation may occur due to the tight coupling of services and the increasing number of resource locks. This becomes even worse in the case of distributed transactions, where locks are held for a long time, in case of a partial failure, especially in cases, where the preparation phase is already over.
- Services evolving independently of each other may decide to behave differently in future releases, with respect to their transactional behavior, e.g. a stock management service automatically ordering new items within the context of the caller’s transaction.
- Services may, knowingly or not, lock resources held by other services, which imposes a security threat comparable to DoS attacks.
At this point, I would like to highlight the performance degradation with respect to cloud scalability. As I mentioned earlier, most programming models stick to thread-bound transactions, which implies that scaling can only occur with respect to the transactional scope. An increasing number of transactionally coupled services and/or resources results in reduced scalability. The only counter-measure is to increase the number of threads, thus, again increasing the number of locked resources, and so on.
Introducing best efforts 1PC transaction handling
While searching for an alternative to distributed XA transactions, I stumbled upon the so-called best efforts 1PC approach explained within this post. As the name suggests, it adheres to one-phase commits and thus is less safe than 2PC. The main idea is to impose a well-defined commit/rollback order focusing on not more than two transactional resources at a time. In other words, it suggests that there is always one transactional resource that is more important than the other one. In the case of an incoming JMS message, these are the changes applied to the database as a result of processing the message, because they cannot be recovered, if the message has been committed but the JVM crashes after that. The opposite is true for outgoing messages, where the outgoing message must be committed before the database change it originates from. The worst, that can happen in both cases, is a failure in between the two commits, leading to duplicate messages either at the receiver or the sender. I will explain later, how to deal with that appropriately.
Nevertheless, the strict ordering of commits and rollbacks, respectively, only solves the problem when dealing with either incoming or outgoing messages, but not both. That is, when an application receiving messages and processing them also wants to send new messages, there is no valid commit order within a single transaction. Luckily, transactional integration already suggests that this is an anti-pattern anyway. So, it is as easy as breaking down the three actions into separate transactions:
- a best efforts 1PC for incoming messages committing the message as a database insert operation first
- another best efforts 1PC for outgoing messages committing the JMS queue first
- a standard 1PC transaction doing all the processing, i.e. reading messages from a database queue, processing them, and placing outgoing message onto another database queue
This approach drastically reduces transactional integration while still providing the same overall safety as XA transactions. Moreover, it provides us with the means to scale each of the three tiers as needed, e.g. using individual thread pools, thus leveraging cloud scalability.
Message buffering with Spring Integration
With the help of Spring Integration (SI) it is rather easy, though not self-explanatory, to implement best efforts 1PC for scalable cloud applications. As the reference documentation states, there is built-in support for buffering messages using JDBC. For the sake of simplicity, I assume that we have a very simple message flow to deal with, as shown in the following picture:
In this example, the application receives incoming messages via JMS, putting them onto the inputMessageChannel. The business logic, represented by a transformer bean, then processes these messages, before producing new output messages. It is assumed, that the business logic transformer relies on data stored in the database, which is also altered as a result of the transformation. Finally, the messages from the outputMessageChannel are sent via JMS, however, not necessarily to the same queue. This example occupies a single thread per message for the whole message flow. Accordingly, each task is also related to a single transaction. More specifically, the only point, where threads and transactions are actually started, is the JMS inbound adapter. No “easy” upscaling is possible in between, in case the processing transformer requires more CPU.
This can be overcome by introducing a so-called JdbcChannelMessageStore, which has been designed for backing SI message channels using JDBC. This message store is generic, i.e. it must be configured using an appropriate query provider for the underlying database. SI provides us with a bunch of ready-to-use query providers for the most popular databases, but you may easily write your own or extend an existing one, if necessary. The message channels in my example are then converted to so-called queue channels, backed by the afore mentioned message store. This leads us to a configuration, as shown in the following picture:
As can be seen, the overall logical message flow remains the same. However, certain channels are now buffered via JDBC and so-called message bridges pick up buffered messages for further processing within separate threads. The Spring XML configuration for this example looks as follows. Note, that infrastructure related beans and properties have been omitted in favor of improved readability.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- TX manager configurations --> <bean id="dbTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <constructor-arg ref="dataSource"/> </bean> <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <constructor-arg ref="connectionFactory"/> </bean> <bean id="inboundJmsTransactionManager" class="org.springframework.data.transaction.ChainedTransactionManager"> <constructor-arg> <list> <ref bean="jmsTransactionManager"/> <ref bean="dbTransactionManager"/> </list> </constructor-arg> </bean> <bean id="outboundJmsTransactionManager" class="org.springframework.data.transaction.ChainedTransactionManager"> <constructor-arg> <list> <ref bean="dbTransactionManager"/> <ref bean="jmsTransactionManager"/> </list> </constructor-arg> </bean> <!-- JDBC message store --> <bean id="jdbcChannelStore" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource" /> <property name="channelMessageStoreQueryProvider"> <bean class="${jdbc.channel.store.query.provider}" /> </property> </bean> <!-- SI message flows --> <int-jms:inbound-channel-adapter connection-factory="connectionFactory" destination-name="${jms.queue.name.incoming}" channel="bufferedInputMessageChannel"> <int:poller fixed-rate="5000" max-messages-per-poll="-1"> <int:transactional transaction-manager="inboundJmsTransactionManager"/> </int:poller> </int-jms:inbound-channel-adapter> <int:channel id="bufferedInputMessageChannel" datatype="java.lang.String"> <int:queue message-store="jdbcChannelStore"/> </int:channel> <int:bridge input-channel="bufferedInputMessageChannel" output-channel="inputMessageChannel"> <int:poller fixed-rate="100" max-messages-per-poll="-1"> <int:transactional transaction-manager="dbTransactionManager"/> </int:poller> </int:bridge> <int:channel id="inputMessageChannel"/> <int:transformer input-channel="inputMessageChannel" output-channel="bufferedOutputMessageChannel"> <bean class="service.MyBusinessTransformer"/> </int:transformer> <int:channel id="bufferedOutputMessageChannel" datatype="java.lang.String"> <int:queue message-store="jdbcChannelStore"/> </int:channel> <int:bridge input-channel="bufferedOutputMessageChannel" output-channel="outputMessageChannel"> <int:poller fixed-rate="5000" max-messages-per-poll="-1"> <int:transactional transaction-manager="outboundJmsTransactionManager"/> </int:poller> </int:bridge> <int:channel id="outputMessageChannel"/> <int-jms:outbound-channel-adapter channel="outputMessageChannel" connection-factory="connectionFactory" destination-name="${jms.queue.name.outgoing}"/> </beans> |
It starts with the definition of transaction manager beans used to implement best efforts 1PC, both for incoming and outgoing messages. As such, there is a so-called ChainedTransactionManager for each direction: an inboundJmsTransactionManager committing buffered JDBC messages first and an outboundJmsTransactionManager committing JMS queues first. The processing tier represented by the transformer bean uses the dbTransactionManager only, that is, the business logic is independent of any messaging related transaction.
Secondly, a JdbcChannelMessageStore is configured referring to the very same data source, assuming that the additional message buffering table has been created beforehand. The message store is then referenced by the buffered message channels, effectively turning them into so-called queue channels. This implies, that a message flow call stack ends, when sending a message to a buffered message channel, and starts, when receiving a message from it, respectively.
Finally, it’s the message bridge, that is responsible for polling messages from a buffered message channel, hence, it is accompanied by a poller configuration. In my example, there are three single-threaded pollers: one for incoming messages, one for message processing, and one for outgoing messages. Each poller is configured to consume as many messages as possible (maxMessages=-1) at a certain polling interval. For the business logic processing it is also possible to provide a separate task executor within the poller configuration, thus, turning it into a multi-threaded poller, if needed. This provides individual scaling capabilities for each of the tiers, although multi-threaded JMS polling and/or sending doesn’t make much sense in my opinion.
Common Pitfalls
It is worth noting, that the approach described above also bears some pitfalls, which have not been mentioned so far, for the sake of simplicity. This is a list of things that should be kept in mind:
- The observant reader may have noticed, that I have chosen strongly typed buffered message channels, i.e. java.lang.String. This is due to the fact, that messages may be consumed from a buffered channel using a different application release, i.e. immediately after a new one has been deployed. Therefore, potentially non-serializable or non-compatible content must be avoided, at all cost. This includes any message headers previously assigned, e.g. by message header enrichers. I suggest buffering messages using primitive types, such as strings, and to remove any unused header attributes, before doing so.
- Another common mistake is, that buffered channel names must not be changed, as long as there are any messages still in the queue. The reason is, that the channel name, e.g. bufferedInputChannelName, is used as part of the primary key within the buffering table.
- Another aspect related to the database buffer is the fact, that the message store implementation polls messages from the database and deletes them afterwards. In the end a database does not, by default, support atomic select/delete operations. Hence, it is important that the polling statement ignores any rows locked by other threads, i.e. that it reads past them. This is already implemented for the Oracle query provider, but may be a necessary enhancement for the database in question. Although, at this point the reference documentation mentions the usage of an id cache to avoid this problem, I strongly suggest the SQL way of providing an appropriate query hint, since caching won’t work in clustered environments, i.e. with multiple JVMs, anyway. Also, make sure you have enabled MVCC, as the documentation suggests, which is not enabled by default on all databases. Deadlocks may occur, if failing to do so.
- Duplicate message detection has not been covered in the example so far. I suggest performing duplicate detection within the input message poller by means of payload digests, e.g. using the SHA-512 digester from Srping Security. The digest can safely be stored within the same transaction and be used in upcoming transactions for detecting potentially duplicate messages. There are rumours that SI 3.0 is going to support this so-called idempotent consumer pattern out of the box. Be sure not to rely on the JMS message id for detecting duplicates, as this only covers duplicates due to failed receptions, not the ones where the sending application resents a message due to errors on the sender side. Remember, that I do not use 2PC here!
- Finally, there is the risk of messages going postal within the processing tier, that is messages causing transactional rollbacks. While this is fine with rollbacks due to temporary, e.g. technical, problems, it is a problem with errors resulting from bugs. The message itself will be rolled back with respect to the buffered message channel as well, so the next thread will pick up the same message immediately afterwards, and so on. So, unless you really enjoy some heavy load on your systems without any perceivable outcome, I suggest that you introduce some generic message error counting within the processing tier. I solved that by introducing another table for this, storing message ids and error counts. The trick here is, to store/increase a message’s error count as part of an after-rollback operation within a new transaction (requires-new propagation). This can be achieved by means of an SI transaction synchronization factory. This means, that the transaction currently being rolled back is responsible for increasing the message count of the message it currently processes. The next transaction may then check the error count before passing it on to any business layer, discarding the message, if it has reached a certain (configurable) limit. Another before-commit operation should be registered for cleaning up the message error counts, respectively. The latter one, of course, must take part in the same transaction, e.g. using mandatory propagation.
Conclusion
As you have seen, designing scalable, cloud supportive messaging applications ain’t something that comes at no cost. Neither is it fully covered by frameworks such as Spring Integration, yet. Nevertheless, the building blocks are all there. However, I strongly recommend you writing automated integration tests, preferably using Spring Test Contexts, for the corner cases mentioned earlier. Finally, I want to give a summary of what I think are the main benefits of this approach:
- The design does not rely on heavy weight XA transactions, nor does it foster the use of full blown application servers for the sake of it. It can be easily incorporated into any JEE application, though preferably Spring. It provides the same safety as XA at the price of in-application message deduplication. However, duplication is also a problem in the case, where two applications communicate with each other, if not both of them use XA properly. And last but not least, it is ready for the cloud.
- The approach is far superior, when it comes to automated testing, especially integration testing. I figured that integration tests involving JMS are way more tedious, because one cannot simply put some test messages onto a queue before testing within the same transaction, due to transactional isolation. The thing is, that it is not my intention to test JMS, anyway. It is simply a messaging API that my application uses. With the processing tier solely relying on database transactions, though, writing test cases boils down to putting test messages on a buffered queue channel before the test and letting the rollback clean up after you’re done.
- The asynchronous design of this approach leverages loose coupling and scalability. It is as easy as adding a thread-pool for the processing tier, if more compute power is needed. Furthermore, individual parts of the application can be shut down or brought up, e.g. using poller properties or SI’s JMX integration. Shutting down message brokers for administration purposes no longer affects the applications communicating with them. Finally, the load put onto the application from the user interface no longer stresses the messaging infrastructure. Sending messages as part of an interactive user action boils down to buffering the very same message in the database. You may even decide to use buffering within the application itself, e.g. for delayed or partial processing using separate threads.
- The decoupling of technologies improves the overall application robustness, when it comes to non-transactional resources, such as sending e-mails. Infrastructure downtimes, e.g. unreachable mail servers, will no longer result in a rollback of otherwise successfully processed business logic. Incoming or outgoing mails can simply be put on a buffered message channel with asynchronous processing.
- Storing messages within the database enables you to perform resequencing, if necessary. Spring Integration already provides an out of the box mechanism for that. So in case the order of messages is important, one may easily hold back messages, until they satisfy a certain order. Nonetheless, database queuing is often considered an anti-pattern on its own, so one may as well decide to go with local JMS channels, as suggested by Clemens Vasters.
Now it’s up to you to share your Spring Integration experiences!
Comment article
Recent posts






Comments
Aneer
Frank, very informative post. If you can please publish a simple working example .. thank you .
Frank Scheffler
If you transfer data from one resource to another, it is generally advisable to commit the destination resource first, because otherwise you may loose data, when the seconds commit will fail. In your case, if I understand you correctly, the AMQP queue should be committed first, then the Tibco Topics.
Unfortunately, I am not sure, if AMQP provides a Spring transaction manager, which then could be chained. It may be, that you will have to implement this programatically.
rao
Thanks for the good explanation. I want to publish the messages from Tibco EMS Topic to AMQP quees. please let me know how can i used chained Transaction Manager in this case.