Results 1 to 5 of 5

Thread: Start / Stop a JMS Bridge with Control Bus

  1. #1
    Join Date
    Jun 2009
    Posts
    8

    Default Start / Stop a JMS Bridge with Control Bus

    Hi
    I'm a bit confused how to start/stop a bridge between 2 JMS adapters (inbound & outbound).

    This is the config I'm using :


    Code:
    	<task:scheduler id="taskScheduler" pool-size="10" />
    
    	<poller id="defaultPoller" fixed-delay="1000"
    		max-messages-per-poll="50" receive-timeout="60000" default="true">
    		<transactional />
    	</poller>
    
    	<!-- Inbound / Outbound channel adatpers -->
    	<jms:inbound-channel-adapter id="inboundAdapter" destination="inputQueue" />
    	<jms:outbound-channel-adapter id="outboundAdapter" destination="outputQueue" />
    
    	<!-- JMS bridges -->
    	<bridge id="bridge" input-channel="inboundAdapter" output-channel="outboundAdapter" />
    
    	<!-- Control channel -->
    	<channel id="controlChannel" />
    	<control-bus input-channel="controlChannel" />
    And

    Code:
    ApplicationContext ac = new ClassPathXmlApplicationContext("spring-context.xml");
    		MessageChannel controlChannel = ac.getBean("controlChannel", MessageChannel.class);
    	
    		Thread.sleep(2000);
    		System.out.println("Stopping bridge....");
    		controlChannel.send(new GenericMessage<String>("@inboundAdapter.stop()"));
    
    		Thread.sleep(5000);
    		System.out.println("Starting bridge....");
    		controlChannel.send(new GenericMessage<String>("@inboundAdapter.start()"));

    But this does not work.

    Exception in thread "main" org.springframework.integration.MessageHandlingExc eption: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.Expression CommandMessageProcessor@3eb52a28]]
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:84)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.doDispatch(UnicastingDispatcher.java :110)
    at org.springframework.integration.dispatcher.Unicast ingDispatcher.dispatch(UnicastingDispatcher.java:9 7)
    at org.springframework.integration.channel.AbstractSu bscribableChannel.doSend(AbstractSubscribableChann el.java:44)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:157)
    at org.springframework.integration.channel.AbstractMe ssageChannel.send(AbstractMessageChannel.java:128)
    at com.sfr.bps.bridges.Main.main(Main.java:17)
    Caused by: org.springframework.expression.EvaluationException : The method 'stop' is not supported by this command processor. If using the Control Bus, consider adding @ManagedOperation or @ManagedAttribute.
    at org.springframework.integration.handler.Expression CommandMessageProcessor$ExpressionCommandMethodRes olver.validateMethod(ExpressionCommandMessageProce ssor.java:100)
    at org.springframework.integration.handler.Expression CommandMessageProcessor$ExpressionCommandMethodRes olver.resolve(ExpressionCommandMessageProcessor.ja va:81)
    at org.springframework.expression.spel.ast.MethodRefe rence.findAccessorForMethod(MethodReference.java:1 74)
    at org.springframework.expression.spel.ast.MethodRefe rence.getValueInternal(MethodReference.java:107)
    at org.springframework.expression.spel.ast.CompoundEx pression.getValueInternal(CompoundExpression.java: 57)
    at org.springframework.expression.spel.ast.SpelNodeIm pl.getTypedValue(SpelNodeImpl.java:102)
    at org.springframework.expression.spel.standard.SpelE xpression.getValue(SpelExpression.java:102)
    at org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:98)
    at org.springframework.integration.util.AbstractExpre ssionEvaluator.evaluateExpression(AbstractExpressi onEvaluator.java:94)
    at org.springframework.integration.handler.Expression CommandMessageProcessor.processMessage(ExpressionC ommandMessageProcessor.java:63)
    at org.springframework.integration.handler.ServiceAct ivatingHandler.handleRequestMessage(ServiceActivat ingHandler.java:64)
    at org.springframework.integration.handler.AbstractRe plyProducingMessageHandler.handleMessageInternal(A bstractReplyProducingMessageHandler.java:98)
    at org.springframework.integration.handler.AbstractMe ssageHandler.handleMessage(AbstractMessageHandler. java:78)
    ... 6 more


    I've find out a little workaround which is to get the inboundChannel.adapter bean, and directly call stop() and start() method, but does not use the control bus...
    Code:
    AbstractEndpoint adapter = ac.getBean("inboundAdapter.adapter", AbstractEndpoint.class);
    adapter.stop();
    But this logs an InteruptedException.

    Any ideas ? What am I doing wrong ?
    Thanks a lot for your help.
    Last edited by osgiliath; May 30th, 2011 at 07:57 AM. Reason: precisions

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    If you use the adapter id to name the channel (instead of explicitly configuring a 'channel' attribute), then the adapter gets a bean name of <channel>.adapter; in your case it would be '@inboundadapter.adapter.stop()' (which you figured out in your workaround). @inboundAdapter.stop() is trying to 'stop' a channel.

    You can explicitly name the channel to, say, 'inbound' instead of relying of the automated naming and then you can stop the adapter using its id.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Jun 2009
    Posts
    8

    Default

    Hi,
    Thanks for the answer.

    It's working, I tried with
    controlChannel.send(new GenericMessage<String>("@'inboundAdapter.adapter'. stop()"));
    controlChannel.send(new GenericMessage<String>("@'inboundAdapter.adapter'. start()"));
    but I still have some exceptions logs which are not really clean I think

    INFO SourcePollingChannelAdapter - stopped inboundAdapter.adapter
    ERROR LoggingHandler - org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: java.lang.InterruptedException
    at org.springframework.jms.support.JmsUtils.convertJm sAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.conver tJmsAccessException(JmsAccessor.java:168)
    at org.springframework.jms.core.JmsTemplate.execute(J msTemplate.java:469)
    at org.springframework.jms.core.JmsTemplate.receiveSe lected(JmsTemplate.java:695)
    at org.springframework.integration.jms.JmsDestination PollingSource.doReceiveJmsMessage(JmsDestinationPo llingSource.java:115)
    at org.springframework.integration.jms.JmsDestination PollingSource.receive(JmsDestinationPollingSource. java:93)
    at org.springframework.integration.endpoint.SourcePol lingChannelAdapter.doPoll(SourcePollingChannelAdap ter.java:89)
    at org.springframework.integration.endpoint.AbstractP ollingEndpoint$1.call(AbstractPollingEndpoint.java :145)
    at org.springframework.integration.endpoint.AbstractP ollingEndpoint$1.call(AbstractPollingEndpoint.java :143)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Nativ e Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Native MethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(De legatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.aop.support.AopUtils.invokeJoi npointUsingReflection(AopUtils.java:309)
    at org.springframework.aop.framework.ReflectiveMethod Invocation.invokeJoinpoint(ReflectiveMethodInvocat ion.java:183)
    at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :150)
    at org.springframework.transaction.interceptor.Transa ctionInterceptor.invoke(TransactionInterceptor.jav a:110)
    at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :172)
    at org.springframework.aop.framework.JdkDynamicAopPro xy.invoke(JdkDynamicAopProxy.java:202)
    at $Proxy4.call(Unknown Source)
    at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller$1.run(AbstractPollingEndpoin t.java:206)
    at org.springframework.integration.util.ErrorHandling TaskExecutor$1.run(ErrorHandlingTaskExecutor.java: 52)
    at org.springframework.core.task.SyncTaskExecutor.exe cute(SyncTaskExecutor.java:48)
    at org.springframework.integration.util.ErrorHandling TaskExecutor.execute(ErrorHandlingTaskExecutor.jav a:49)
    at org.springframework.integration.endpoint.AbstractP ollingEndpoint$Poller.run(AbstractPollingEndpoint. java:201)
    at org.springframework.scheduling.support.DelegatingE rrorHandlingRunnable.run(DelegatingErrorHandlingRu nnable.java:51)
    at org.springframework.scheduling.concurrent.Reschedu lingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.cal l(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRun(Futu reTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.jav a:138)
    at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.access$301(ScheduledThreadPoolE xecutor.java:98)
    at java.util.concurrent.ScheduledThreadPoolExecutor$S cheduledFutureTask.run(ScheduledThreadPoolExecutor .java:206)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run Task(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:680)
    Caused by: javax.jms.JMSException: java.lang.InterruptedException
    at org.apache.activemq.util.JMSExceptionSupport.creat e(JMSExceptionSupport.java:62)
    at org.apache.activemq.ActiveMQMessageConsumer.dequeu e(ActiveMQMessageConsumer.java:483)
    at org.apache.activemq.ActiveMQMessageConsumer.receiv e(ActiveMQMessageConsumer.java:504)
    at org.springframework.jms.core.JmsTemplate.doReceive (JmsTemplate.java:777)
    at org.springframework.jms.core.JmsTemplate.doReceive (JmsTemplate.java:741)
    at org.springframework.jms.core.JmsTemplate.doReceive (JmsTemplate.java:722)
    at org.springframework.jms.core.JmsTemplate$9.doInJms (JmsTemplate.java:697)
    at org.springframework.jms.core.JmsTemplate$9.doInJms (JmsTemplate.java:1)
    at org.springframework.jms.core.JmsTemplate.execute(J msTemplate.java:466)
    ... 32 more
    Caused by: java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:485)
    at org.apache.activemq.SimplePriorityMessageDispatchC hannel.dequeue(SimplePriorityMessageDispatchChanne l.java:87)
    at org.apache.activemq.ActiveMQMessageConsumer.dequeu e(ActiveMQMessageConsumer.java:452)
    ... 39 more

    Starting bridge....
    INFO SourcePollingChannelAdapter - started inboundAdapter.adapter
    It is the expected behaviour ?
    Thanks.

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    Yes, given that you are using a polling channel adapter, we have to interrupt the thread that's hanging on the receive; I suppose we could consider suppressing the log message if the error is a result of stopping the adapter. Consider entering an 'Improvement' JIRA issue if you feel strongly about it.

    Also, consider using a message-driven-channel-adapter instead; in that case, I believe you will only get a log message if you happen to stop the adapter just as a message is being received, forcing it to be rolled-back onto the queue.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Jun 2009
    Posts
    8

    Default

    OK, but I'll keep the poller since I need the max-message-per-poll and fixed-delay properties in order to have a flow controler.
    Thanks.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •