For those interested I have implemented a wrapper around Spring AMQP to make an in-memory list of pending messages that are then sent after the platform's transaction has commited.
That is if you do something like:
With the current Spring AMQP/RabbitMQ model you would have to use RabbitMQ TX which are god awfully slow.Code:@Transaction public void doSomeDbStuffAndSendMessage() { //Do some db stuff and put the id of a entity into Message m. customRabbitTemplate.send(m); // Maybe do some more db stuff. }
The reality is most of the time you just want the messages to be sent after the transaction has committed.
Below is a rough idea how I implemented this. I think that Spring AMQP could offer something like this as the above use case is a very common one (I have seen dozens of Stackoverflow Q. of why is my entity not persisted yet?).
Code:public static Optional<MessageBusResourceHolder> getTransactionalResourceHolder(TxMessageBus messageBus) { if ( ! TransactionSynchronizationManager.isActualTransactionActive()) { return Optional.absent(); } MessageBusResourceHolder o = (MessageBusResourceHolder) TransactionSynchronizationManager.getResource(messageBus); if (o != null) return Optional.of(o); o = new MessageBusResourceHolder(); TransactionSynchronizationManager.bindResource(messageBus, o); o.setSynchronizedWithTransaction(true); if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronizationManager.registerSynchronization(new MessageBusResourceSynchronization(o, messageBus)); } return Optional.of(o); } private static class MessageBusResourceSynchronization extends ResourceHolderSynchronization<MessageBusResourceHolder, TxMessageBus> { private final TxMessageBus messageBus; private final MessageBusResourceHolder holder; public MessageBusResourceSynchronization(MessageBusResourceHolder resourceHolder, TxMessageBus resourceKey) { super(resourceHolder, resourceKey); this.messageBus = resourceKey; this.holder = resourceHolder; } @Override protected void cleanupResource(MessageBusResourceHolder resourceHolder, TxMessageBus resourceKey, boolean committed) { resourceHolder.getPendingMessages().clear(); } @Override public void afterCompletion(int status) { if (status == TransactionSynchronization.STATUS_COMMITTED) { for (Object o : holder.getPendingMessages()) { messageBus.post(o, false); } } else { holder.getPendingMessages().clear(); } super.afterCompletion(status); } }


Reply With Quote
