Results 1 to 8 of 8

Thread: How to add the suscribers to a publisher?

  1. #1
    Join Date
    Dec 2008
    Posts
    20

    Default How to add the suscribers to a publisher?

    Hi!

    I´ve a spring context like this:
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:i="http://www.springframework.org/schema/integration"
    	xmlns:jdbc="http://www.springframework.org/schema/integration/jdbc"
    	xsi:schemaLocation="http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
    		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    		http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
    		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    
    	<bean id="dataSource" class="oracle.jdbc.pool.OracleDataSource"
    		destroy-method="close">
    		<property name="connectionCachingEnabled" value="true" />
    		<property name="URL" value="jdbc:oracle:thin:@//localhost:1521/XE" />
    		<property name="password" value="admin" />
    		<property name="user" value="admin" />
    		<property name="connectionCacheProperties">
    			<props merge="default">
    				<prop key="MinLimit">3</prop>
    				<prop key="MaxLimit">20</prop>
    			</props>
    		</property>
    	</bean>
    
    	<i:channel id="coffeeChannel" />
    
    	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    	    <property name="dataSource" ref="dataSource"/>
    	</bean>
    
    	<bean id="CoffeBeverageMapper" class="org.springframework.integration.support.CoffeBeverageMapper" />
    
    	<jdbc:inbound-channel-adapter id="findAllCoffeeBeverages"
    		channel="coffeeChannel" data-source="dataSource"
    		query="SELECT id, coffee_name, coffee_description FROM coffee_beverages ORDER BY ID"
    		row-mapper="CoffeBeverageMapper"
    		max-rows-per-poll="100">
    		<i:poller trigger="trigerConsulta">
    			<i:transactional propagation="REQUIRED"
    				transaction-manager="transactionManager" />
    		</i:poller>
    	</jdbc:inbound-channel-adapter>
    
    	<bean id="trigerConsulta" class="org.springframework.scheduling.support.PeriodicTrigger">
    		<constructor-arg name="period" value="15000"/>
    		<property name="initialDelay" value="10000"/>
    	</bean>
    
    </beans>
    and I call it in the main like this:
    Code:
    final AbstractApplicationContext context =
    			new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/*-context.xml");
    
    		context.registerShutdownHook();
    				List<CoffeeBeverage> coffeeBeverages = service.findAllCoffeeBeverages();
    
    				if (coffeeBeverages!=null) {
    					for (CoffeeBeverage coffeeBeverage : coffeeBeverages) {
    						System.out.println(String.format("%s - %s", coffeeBeverage.getId(),
    																	coffeeBeverage.getName()));
    					}
    				} else {
    					System.out.println("coffeeBeverages: null");
    				}
    So when I run it with maven I get:
    Code:
    12:03:51.528 ERROR [task-scheduler-1][org.springframework.integration.handler.LoggingHandler] org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers for channel coffeeChannel.
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:82)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
    	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
    	at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
    	at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
    	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:97)
    	at org.springframework.integration.endpoint.AbstractTransactionSynchronizingPollingEndpoint.doPoll(AbstractTransactionSynchronizingPollingEndpoint.java:82)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:144)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:319)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110)
    	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
    	at $Proxy15.call(Unknown Source)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:236)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    	at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
    	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    	at java.lang.Thread.run(Thread.java:662)
    Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:109)
    	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    	... 34 more
    1. Why is it asking me for suscribers if I´m not using a Publish channel?
    2. So how do I add the suscribers?

    Thanks in advance.

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    Dispatcher has no subscribers for channel coffeeChannel.
    This simply means you have no endpoint declared to receive messages from coffeeChannel.

    Your JDBC inbound adapter is polled, creates a message and sends it to 'coffeeChannel' but there is nothing in your configuration to receive that message.

    What do you want to "do" with the message?

    Typically, you'd have some service invoked, such as

    Code:
    <i:service-activator input-channel="coffeeChannel" ref="myBean" output-channel="nextChannel" />
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Dec 2008
    Posts
    20

    Default

    I just want to print the message in console and sent it as you suggest to the next channel.
    So I did as you suggested but now I get:
    Code:
     Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2:java (default-cli) on project oracle-stored-procedures: An exception occured while executing the Java class. null: InvocationTargetException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Cannot resolve reference to bean 'org.springframework.integration.config.ServiceActivatorFactoryBean#0' while setting bean property 'handler'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ServiceActivatorFactoryBean#0': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.List] for method match: [public void org.springframework.integration.model.CoffeeBeverage.setDescription(java.lang.String), public void org.springframework.integration.model.CoffeeBeverage.setId(java.lang.Integer), public java.lang.String org.springframework.integration.model.CoffeeBeverage.getDescription()] -> [Help 1]
    org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2:java (default-cli) on project oracle-stored-procedures: An exception occured while executing the Java class. null

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    Well, without seeing your configuration, I can't really tell what you did - the error message indicates you sent it to a bean with more than one method and the framework couldn't figure out which method to call - in that case, you have to add a 'method' attribute to the service-activator.

    However, if you just want to send it to the console, add a <int-stream:stdout-channel-adapter channel="coffeeChannel" /> and send it there.

    I suggest you browse the other samples (e.g. JMS) for examples.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Dec 2008
    Posts
    20

    Default

    Ok I decided to simplify it, so I let my spring context like this:

    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
    	xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    		http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
    		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
    	<bean id="dataSource" class="oracle.jdbc.pool.OracleDataSource"
    		destroy-method="close">
    		<property name="connectionCachingEnabled" value="true" />
    		<property name="URL" value="jdbc:oracle:thin:@//localhost:1521/XE" />
    		<property name="password" value="admin" />
    		<property name="user" value="admin" />
    		<property name="connectionCacheProperties">
    			<props merge="default">
    				<prop key="MinLimit">3</prop>
    				<prop key="MaxLimit">20</prop>
    			</props>
    		</property>
    	</bean>
    
    	<int:channel id="findCoffeeProcedureRequestChannel" />
    	<int:channel id="findAllProcedureRequestChannel" />
    	<int:channel id="insertCoffee" />
    
    	<int:gateway id="gateway" default-request-timeout="5000"
    		default-reply-timeout="5000"
    		service-interface="org.springframework.integration.service.CoffeeService">
    		<int:method name="findCoffeeBeverage" request-channel="findCoffeeProcedureRequestChannel" />
    		<int:method name="findAllCoffeeBeverages" request-channel="findAllProcedureRequestChannel" />
    		<int:method name="insertCoffee" request-channel="insertCoffee" />
    	</int:gateway>
    
    	<int-jdbc:stored-proc-outbound-gateway
    		id="outbound-gateway-storedproc-find-coffee" data-source="dataSource"
    		request-channel="findCoffeeProcedureRequestChannel"
    		skip-undeclared-results="true" stored-procedure-name="FIND_COFFEE_BEVERAGES"
    		expect-single-result="true" is-function="true">
    		<int-jdbc:parameter name="PID" expression="payload" />
    	</int-jdbc:stored-proc-outbound-gateway>
    
    	<int-jdbc:stored-proc-outbound-gateway
    		id="outbound-gateway-storedproc-find-all" data-source="dataSource"
    		request-channel="findAllProcedureRequestChannel" expect-single-result="true"
    		stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES" is-function="true">
    		<int-jdbc:returning-resultset name="coffeeBeverages"
    			row-mapper="org.springframework.integration.support.CoffeBeverageMapper" />
    	</int-jdbc:stored-proc-outbound-gateway>
    
    	<int-jdbc:stored-proc-outbound-gateway id="storeProcedureOutbound"
    		request-channel="insertCoffee" data-source="dataSource"
    		stored-procedure-name="agregar_edi">
    		<int-jdbc:parameter name="p_COFFEE_NAME" expression="payload.p_COFFEE_NAME" />
    		<int-jdbc:parameter name="p_COFFEE_DESCRIPTION" expression="payload.p_COFFEE_DESCRIPTION" />
    	</int-jdbc:stored-proc-outbound-gateway>
    
    </beans>
    My main is like this:
    Code:
    		final AbstractApplicationContext context =
    			new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/*-context.xml");
    
    		context.registerShutdownHook();
    
    		final CoffeeService service = context.getBean(CoffeeService.class);
    service.insertCoffee("Nescafe", "Huele rico!");
    So when I run it I get:
    Code:
    Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2:java (default-cli) on project oracle-stored-procedures: An exception occured while executing the Java class. null: InvocationTargetException: failed to convert object to Message: At most one parameter (or expression via method-level @Payload) may be mapped to the payload or Message. Found more than one on method [public abstract void org.springframework.integration.service.CoffeeService.insertCoffee(java.lang.String,java.lang.String)] -> [Help 1]
    org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.2:java (default-cli) on project oracle-stored-procedures: An exception occured while executing the Java class. null
    	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217)
    	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
    	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
    	at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84)
    	at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59)
    	at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183)
    	at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161)
    	at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320)
    	at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156)
    	at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537)
    	at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196)
    	at org.apache.maven.cli.MavenCli.main(MavenCli.java:141)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    	at java.lang.reflect.Method.invoke(Method.java:597)
    	at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:290)
    	at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:230)
    	at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:409)
    	at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:352)
    Caused by: org.apache.maven.plugin.MojoExecutionException: An exception occured while executing the Java class. null
    	at org.codehaus.mojo.exec.ExecJavaMojo.execute(ExecJavaMojo.java:346)
    	at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:101)
    	at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:209)
    	... 19 more
    why is it failing to convert my object to a message?
    Thanks in advance.

  6. #6
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,036

    Default

    A message can only have one payload - you are sending 2 strings - the framework can't figure out which is the payload.

    In any case, your stored procedure is looking for two properties on the payload

    Code:
    <int-jdbc:parameter name="p_COFFEE_NAME" expression="payload.p_COFFEE_NAME" />
    <int-jdbc:parameter name="p_COFFEE_DESCRIPTION" expression="payload.p_COFFEE_DESCRIPTION" />
    This implies the payload is a java bean with two properties, and 2 getters for those properties...

    Code:
    getP_COFFEE_NAME() and getP_COFFEE_DESCRIPTION()
    I suggest you browse the other samples, and the documentation (http://static.springsource.org/sprin...eference/html/) to get a better understanding of the framework.
    Last edited by Gary Russell; Feb 28th, 2013 at 02:48 PM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  7. #7
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    644

    Default

    Hi!
    Exception message says to you an advice
    At most one parameter (or expression via method-level @Payload) may be mapped to the payload or Message. Found more than one on method
    So, you should mark one of parameter of your 'insertCoffee' gateway's method as @Payload. From other side: another one should be marked with @Header: http://static.springsource.org/sprin...on-annotations

    Cheers,
    Artem

  8. #8
    Join Date
    Dec 2008
    Posts
    20

    Default

    Ok I read the documentation and I found 2 ways

    stored-proc-outbound-gateway (in the explanation they have the xml shows a definition with 2 parameters)

    Code:
    	<int-jdbc:stored-proc-outbound-gateway
    		id="storeProcedureOutbound" request-channel="insertCoffee" data-source="dataSource"
    		stored-procedure-name="agregar_edi">
    		<int-jdbc:sql-parameter-definition name="P_COFFEE_NAME"/>
    		<int-jdbc:sql-parameter-definition name="P_COFFEE_DESCRIPTION"/>
    		<int-jdbc:parameter name="P_COFFEE_NAME" />
    		<int-jdbc:parameter name="P_COFFEE_DESCRIPTION"  />
    	</int-jdbc:stored-proc-outbound-gateway>

    and

    stored-proc-outbound-channel-adapter

    Code:
    	<int-jdbc:stored-proc-outbound-channel-adapter
    		id="storeProcedureOutbound" channel="insertCoffee" data-source="dataSource"
    		stored-procedure-name="agregar_edi">
    		<int-jdbc:sql-parameter-definition name="P_COFFEE_NAME"/>
    		<int-jdbc:sql-parameter-definition name="P_COFFEE_DESCRIPTION"/>
    		<int-jdbc:parameter name="P_COFFEE_NAME" />
    		<int-jdbc:parameter name="P_COFFEE_DESCRIPTION"  />
    	</int-jdbc:stored-proc-outbound-channel-adapter>
    in the documentation (http://static.springsource.org/sprin...hannel-adapter)

    it shows an example with 2 parameters but it still tells me that I can have 2 parameters.

    I try both, but both return the exception about only accepting 1 parameter.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •