Hello,
Would like to know how exactly the task executor works in publish-subscribe channel??Is it supposed to be some method or some class?? Can anyone provide an example using task executor without annotations??Thanks
Hello,
Would like to know how exactly the task executor works in publish-subscribe channel??Is it supposed to be some method or some class?? Can anyone provide an example using task executor without annotations??Thanks
Last edited by ashleyvijay; Oct 23rd, 2008 at 03:55 AM.
In terms of registering subscribers the PublishubscribeChannel exposes subscribe/unsubscribe methods for registration of MessageConsumer instances. These are inherited from AbstractSubscribableChannel.
The PublishSubscribeChannel creates an instance of BroadcastingDispatcher and passed the task executor if provided to this. The task executor is then used to execute a runnable for each message being passed to each of the subscribed consumers.
If you wish to register a subscriber wich does not implement MessageConsumer then MethodInvokingConsumer can be used to wrap the consumer.
The TaskExecutor expected by the PublishSubscribeChannel accepts any instance of org.springframework.core.task.TaskExecutor, several implementation are provided by Core Spring. There is also namespace support for creating a thread pool based task executor see the schema for element thread-pool-task-executor.
Jonas Partner
OpenCredo
Hey Thanks for the reply. So am I supposed to have subscribe/unsubscribe methods in my code. Can u provide a simple code snippet so that the flow is understood more clearly??
Can you explain a little more what you are trying to do so I can make sure the any example addresses your use case. I assume you have some xml configuration and code, could you post that as well.
Jonas Partner
OpenCredo
Thanks for the reply again. Im just using the WebServiceDemo example and my aim is to make the consumers/clients notified if theres any change in the webservice. my xml config right now is the below and I want to use publish subscribe channel to make this work but Im not sure making the sendChannel as publish-subscribe alone will work.What else needs to be done in terms of code/xml config.Would appreciate help with some code snippet as how publish subscribe can be integrated in this example.Thanks
gid.xml
GidWS.javaCode:<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:tool="http://www.springframework.org/schema/tool" xmlns:integration="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/tool http://www.springframework.org/schema/tool/spring-tool-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-ws-1.0.xsd "> <message-bus/> <channel id="sendChannel"/> <!-- The service activator receives from the 'sendChannel', invokes the handler, and sends the reply Message to the 'replyChannel'. --> <service-activator input-channel="sendChannel" ref="msg1Converter" output-channel="replyChannel"/> <!-- The handler invokes the WebService for the given URI and returns a reply Message. --> <ws-handler id="msgConverter" uri="http://localhost:8080/Trial1/companyservice"/> <!-- The response from the service is logged to the console. --> <channel-adapter id="replyChannel" target="console"/> <console-target id="console"/> </beans:beans>
Code:package org.springframework.integration.gidb.ws; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.ws.handler.AbstractWebServiceHandler; import java.util.Properties; public class GidWS { /** * @param args */ public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("gid.xml", GidWS.class); String requestXml = "<hr:CompanyRequest xmlns:hr=\"http://mycompany.com/hr/schemas\">" + "<hr:Company>"+ "<hr:CompanyName>GetAllCompanies</hr:CompanyName>"+ "</hr:Company>"+ "</hr:CompanyRequest>"; Message<String> message = MessageBuilder.fromPayload(requestXml) .setHeader(AbstractWebServiceHandler.SOAP_ACTION_PROPERTY_KEY, "http://mycompany.com/hr/schemas") .build(); ((MessageChannel)context.getBean("sendChannel")).send(message); } }
Making the send channel publish subscribe would allow you to have multiple consumers registered to receive each message sent on this channel. So for example the below is a valid configuration with every message placed on pubSub channel being received by both consumerOne and consumerTwo. I am not sure if this is really what you want since I am not clear what you mean by 'notified if theres any change in the webservice'.
Code:<si:publish-subscribe-channel id="pubSub" /> <si:service-activator ref="consumerOne" method="onMessage" input-channel="pubSub" /> <si:service-activator ref="consumerTwo" method="onMessage" input-channel="pubSub"/> <bean id="consumerOne" class="com.springsource.siexperiments.pubsub.TestConsumer"/> <bean id="consumerTwo" class="com.springsource.siexperiments.pubsub.TestConsumer"/>
In the above configuration each consumers onMessage method will be invoked by the thread sending the message so the sender is blocked while the consumer handles the message. The below configuration adds a task executor so threads from the pool will be used to invoke the consumer.
Code:<si:publish-subscribe-channel id="pubSub" task-executor="poolOne" /> <si:thread-pool-task-executor id="poolOne" max-size="10" /> <si:service-activator ref="consumerOne" method="onMessage" input-channel="pubSub" /> <si:service-activator ref="consumerTwo" method="onMessage" input-channel="pubSub"/> <bean id="consumerOne" class="com.springsource.siexperiments.pubsub.TestConsumer"/> <bean id="consumerTwo" class="com.springsource.siexperiments.pubsub.TestConsumer"/>
Jonas Partner
OpenCredo
Thank u for the clarification.Well what I meant by 'notified if theres any change in the webservice' was if theres some change for example in data in the spring webservice(which i already have)I want to notify all the registered consumers about this change via Spring Integration and I ws wondering whether publish-subscribe alone will be able to do this in the Spring Integration side.If I understood right comparing ur explanation and my code the "Reply Channel" (service activator's o/p channel)are the consumers and the "SendChannel" (service activator's i/p channel) is the publisher if I make this channel as publish-subscribe channel right???Which means I can have many consumers i.e ReplyChannel1 ,ReplyChannel2....right?? I will try out the stuff from ur reply.Anyways thank u once again
Last edited by ashleyvijay; Oct 23rd, 2008 at 07:31 AM.
May I know where is the schema for thread pool task excecutor available??And also what does the max-size mean??Thanks
Last edited by ashleyvijay; Oct 23rd, 2008 at 07:38 AM.
thread-pool-task-executor is in the schema for RC1 which is imminent. You can wire up a bean as an alternative.
See the javadoc for details of the configuration options.Code:<bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" />
ThreadPoolTaskExecutor
Jonas Partner
OpenCredo