Page 1 of 2 12 LastLast
Results 1 to 10 of 13

Thread: CannotCreateTransactionException when using splitter and dispatcher

  1. #1
    Join Date
    Aug 2011
    Posts
    7

    Default CannotCreateTransactionException when using splitter and dispatcher

    I have a chain that uses a splitter that splits a request into 10 separate threads using a dispatcher/task-executor.

    We use Spring Data repositories to retrieve data. While I am able to retrieve data before the splitter splits the request, when querying for data post the split, I get CannotCreateTransactionException.

    Code:
    Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.NullPointerException
    	at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:382)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:371)
    	at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:335)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:105)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:155)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    My pipeline configuration is as follows:
    Code:
    	<si:chain input-channel="emsValuationServiceIn" output-channel="requestSplitterChannel">
    		<si:filter ref="requestFilter" method="accept" />
    		<si:transformer ref="valuationProcessor" />
    		<si:splitter ref="positionContextSplitter" method="splitRequest" />
    	</si:chain>
                  <si:channel id="requestSplitterChannel" >
    		<si:dispatcher task-executor="positionRequestTaskExecutor" />
    	</si:channel>
                <bean id="positionRequestTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
    		<property name="corePoolSize" value="10" />
    		<property name="daemon" value="false" />
    	</bean>
    <si:chain input-channel="requestSplitterChannel" output-channel="aggregatorRequestChannel"  >
    		<si:transformer ref="processorA" />
    		<si:transformer ref="processorB" />
    	</si:chain>
    The documentation of executor channel does mention that:"this does break the "single-threaded" execution context between sender and receiver so that any active transaction context will not be shared by the invocation of the handler". So since a new transaction would be needed post the 'requestSplitterChannel' above, we did add @Transactional to processorA and processorB, assuming that would tell Spring to create a new transaction within those threads. But that does not work and gives us the exception mentioned above.

    Is there a way of introducing a new transaction here? Any help would be appreciated.

    P.S: I am using Spring Integration 2.0.3.RELEASE.

  2. #2
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    Can you post the complete exception stack trace?

  3. #3
    Join Date
    Aug 2011
    Posts
    7

    Default

    Here it is:

    Code:
    2011-11-18 12:07:36,904 ERROR (positionRequestTaskExecutor-1)[notification.ExceptionNotificationHandler] 
     Error occured - processing on item will not continue 
    org.springframework.integration.transformer.MessageTransformationException: org.springframework.integration.MessageHandlingException: com.nomura.spb.infrastructure.exception.notification.ErrorException: Error processing message in the pipeline
    	at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:73)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:98)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:150)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:150)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:176)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:160)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:125)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:119)
    	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:101)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:133)
    	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:110)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:51)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:92)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:619)
    Caused by: org.springframework.integration.MessageHandlingException: com.nomura.spb.infrastructure.exception.notification.ErrorException: Error processing message in the pipeline
    	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
    	at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:56)
    	at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:67)
    	... 29 more
    Caused by: com.nomura.spb.infrastructure.exception.notification.ErrorException: Error processing message in the pipeline
    	at com.nomura.spb.infrastructure.exception.notification.PublishingExceptionNotifier.error(PublishingExceptionNotifier.java:28)
    	at com.nomura.spb.services.valuationservice.pipeline.base.PositionContextTransformer.handleUnexpectedException(PositionContextTransformer.java:21)
    	at com.nomura.spb.services.valuationservice.pipeline.base.ExceptionHandlingPipelineElement.doHandle(ExceptionHandlingPipelineElement.java:39)
    	at com.nomura.spb.services.valuationservice.pipeline.base.TransformerPipelineElement.handle(TransformerPipelineElement.java:32)
    	at sun.reflect.GeneratedMethodAccessor400.invoke(Unknown Source)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
    	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:84)
    	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
    	at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
    	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
    	at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:114)
    	at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:223)
    	at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:123)
    	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
    	... 31 more
    Caused by: com.nomura.spb.services.valuationservice.exception.PipelineException: com.nomura.spb.services.valuationservice.exception.SwapException: org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.NullPointerException
    	at com.nomura.spb.services.valuationservice.exception.ExceptionContext.forPipeline(ExceptionContext.java:69)
    	... 46 more
    Caused by: com.nomura.spb.services.valuationservice.exception.SwapException: org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.NullPointerException
    	... 47 more
    Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.NullPointerException
    	at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:382)
    	at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:371)
    	at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:335)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:105)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:155)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
    	at $Proxy241.findOne(Unknown Source)
    	at com.nomura.spb.services.valuationservice.pipeline.DividendCashflowPropertyUtility.getMarketBySecurity(DividendCashflowPropertyUtility.java:55)
    	at com.nomura.spb.services.valuationservice.pipeline.DividendCashflowPropertyUtility.getDividendConventionBySecurity(DividendCashflowPropertyUtility.java:39)
    	at com.nomura.spb.services.valuationservice.pipeline.DividendsProcessor.process(DividendsProcessor.java:132)
    	at com.nomura.spb.services.valuationservice.pipeline.base.ExceptionHandlingPipelineElement.doHandle(ExceptionHandlingPipelineElement.java:37)
    	... 44 more
    Caused by: java.lang.NullPointerException
    	at org.hibernate.transaction.JDBCTransaction.begin(JDBCTransaction.java:81)
    	at org.hibernate.impl.SessionImpl.beginTransaction(SessionImpl.java:1473)
    	at org.hibernate.ejb.TransactionImpl.begin(TransactionImpl.java:60)
    	at org.springframework.orm.jpa.DefaultJpaDialect.beginTransaction(DefaultJpaDialect.java:70)
    	at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:57)
    	at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:332)
    	... 56 more

  4. #4
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    Hi Pankil,
    I developed a similar sample where we get a message over a channel to the splitter which in turn puts the split messages over an executor output channel.
    The executor channel is an input to a transformer which transforms the message and puts it over another output channel. Now this transformer also stores the original and transformed message in the database using JPA and i have put a @Transactional annotation around the method and injected an EntityManager in the bean.
    This whole setup seems to work fine with me without any problem

    What version of Hibernate Entity manager are you using? Can you possible try using a different version of the Entity manager? I have used version 3.5.5-Final for entity manager and spring 3.0.6.

    Lets see if this solves your problem, else we can look at some other way to sort the problem out.

  5. #5
    Join Date
    Aug 2011
    Posts
    7

    Default

    Thanks Amol for trying this.

    We use 3.1.0.M2 Spring and 3.6.0.FINAL Hibernate Entity Manager. Would it be possible for you to attach your sample project if it is not too huge?

    Thanks again!

  6. #6
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    Hi Pankil,

    I went through your stack trace and i guess your @Transformer method in the transformer bean need to be looked. I'll try attaching the code here, meanwhile can you possible use the versions i recommended for spring and hibernate and check if we face the same issue?

  7. #7
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    The Spring config file used

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xmlns:integration="http://www.springframework.org/schema/integration"
    	xmlns:tx="http://www.springframework.org/schema/tx"
    	xmlns:task="http://www.springframework.org/schema/task"	
    	xsi:schemaLocation=
    	"http://www.springframework.org/schema/beans http://www.springframework.org/schem...-beans-3.0.xsd
    	http://www.springframework.org/schema/task	http://www.springframework.org/schem...g-task-3.0.xsd
    	http://www.springframework.org/schema/integration http://www.springframework.org/schem...ration-2.1.xsd
    	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    	
    	<tx:annotation-driven/>
    	
    	<integration:channel id="inputChannel"/>
    	
    	<integration:channel id="spiltterOutputChannel">
    		<integration:dispatcher task-executor="taskExecutor"/>
    	</integration:channel>
    	
    	<integration:channel id="outputChannel"/>
    	
    	<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
    		<property name="corePoolSize" value="10" />	
    	</bean>
    	
    	<bean id="splitterBean" class="com.example.integration.test.forum.SplitterBean"/>
    	<integration:splitter input-channel="inputChannel" output-channel="spiltterOutputChannel" ref="splitterBean"/>
    	
    	<bean id="transformer" class="com.example.integration.test.forum.TransformerBean"/>
    	
    	<integration:transformer input-channel="spiltterOutputChannel" output-channel="outputChannel" ref="transformer"/>
    	
    	 
    	
    	<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    		<property name="driverClassName" value="org.apache.derby.client.ClientXAConnection"/>
    		<property name="url" value="jdbc:derby://localhost:1527/TestDatabase"/>
    	</bean>	
    		
    
    	 
    	<bean class="org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor"/>
    	 
    	<bean id="lc" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    		<property name="dataSource" ref="dataSource"/>	
    	</bean>	
    
    	
    	<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
    		<constructor-arg ref="lc"/>		
    	</bean>
    	 
    	
    </beans>
    The Splitter bean
    Code:
    public class SplitterBean {
    
    	
    	@Splitter
    	public Collection<String> split(String payload) {
    		String[] splits = payload.split(",");		
    		return Arrays.asList(splits);
    	}
    }
    and transformer

    Code:
    public class TransformerBean {
    
    	@PersistenceContext
    	private EntityManager em;
    
    	@Transformer
    	@Transactional(propagation=Propagation.REQUIRED)
    	public String transform(String payload) {
    		String transformed = payload.toUpperCase();
    		SplitAudit audit = new SplitAudit();
    		audit.setSplit(transformed);
    		audit.setTime(new Date());
    		em.persist(audit);
    		return transformed;
    	}
    }
    SplitAudit is a simple POJO JPA entity and hence not included (please note the example is a very poor use case but just implemented to check if the problem is faced )

    Input to the splitter is a simple String message with payload say "test,test1,test2"

    Also can you try invoking the method annotated with @Transformer of your transformer bean from a junit providing the method some dummy input value? It would be interesting to see the result. It should ideally be failing there too.

  8. #8
    Join Date
    Aug 2011
    Posts
    7

    Default

    Thanks Amol for trying this out. We tried to replicate the issue with your config.

    If we make TransformerBean implement any interface e.g. Cloneable we get the stack below, the only way around it seems to be to make it also implement Transformer.

    Code:
    Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1004E:(pos 8): Method call: Method transform(org.springframework.core.convert.TypeDescriptor) cannot be found on $Proxy147 type
    	at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:182)
    	at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:106)
    	at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
    	at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
    	at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
    	at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:114)
    	at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:223)
    	at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:123)
    	at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)

    Can anyone from spring integration team comment on whether this is an issue and can be fixed?

  9. #9
    Join Date
    Oct 2011
    Location
    Mumbai, India
    Posts
    213

    Default

    Hi Pankil,

    Any reason why you want the transformer bean to implement the Cloneable interface?

    You are getting this exception because, the proxy generated is a JDK proxy the moment you implement an interface in your bean. Using CGLIB proxy should solve your problem

    You may implement any interfaces you want but add <aop:config proxy-target-class="true"/> to your spring config file.

    This will ensure CGLIB proxies are generated for all eligible beans and your exception will go away.

  10. #10
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,854

    Default

    The CGLIB proxy should indeed solve the immediate issue, OR you should be able to simply provide the method name in the config of your "transformer" element (e.g. method="transform" in addition to the ref).

    Otherwise, you can also provide your own interface (like Transformer with the transform() method). That way that interface will also be implemented by the JDK proxy. If you take that approach, try moving the @Transformer annotation to the interface method declaration instead of the implementation.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •