Hi ,
I was working on application that makes use of aggregator that takes it input from a input channel and then aggregate the messages and then after that it throws to the destination channel.
the agregator release stratergy is such that when mesage count reaches to 10 , it releases them..so for example if you send 20 messages it will release them in the group of 10 so finally 2 groups will be there (10*2)
now my question is such that if you send 6 messages only instaesd of 20 then aggreagtor would keep waiting as it expects 20 messages but for this i have configured MessageGroupStoreReaper and in configuration section and I have also configured send-partial-result-on-expiry="true" in aggregator, Now when I am sending six messages , the program goes in loop and it doesn't releases those 6 messages, please guide me where the things are not going perfect..!!
the configuration file is shown below.....
Code:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:si="http://www.springframework.org/schema/integration" xmlns:stream="http://www.springframework.org/schema/integration/stream" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="maxConnections" value="8" /> <property name="maximumActive" value="500" /> <property name="connectionFactory" ref="connectionFactory" /> </bean> <si:gateway id="gateway" service-interface="com.sample.agg.Gateway" default-request-channel="aggregator-input-channel" > </si:gateway> <si:channel id="aggregator-input-channel"> <si:queue capacity="100" /> </si:channel> <si:aggregator id="exampleAggregator" input-channel="aggregator-input-channel" output-channel="aggregator-output-channel" ref="sampleAggregator" method="aggregateMessagaes" send-partial-result-on-expiry="true" correlation-strategy="exampleCorrelationBean" correlation-strategy-method="correlationStrategy" release-strategy="exampleReleaseStrategy" release-strategy-method="releaseStrategy" message-store="exampleMessageStore" > </si:aggregator> <bean id="sampleAggregator" class="com.sample.agg.ExampleAggregator"></bean> <bean id="exampleCorrelationBean" class="com.sample.agg.ExampleCorrelationBean" /> <bean id="exampleReleaseStrategy" class="com.sample.agg.ExampleReleaseStrategy"></bean> <bean id="exampleMessageStore" class="org.springframework.integration.store.SimpleMessageStore" /> <bean id="exampleReaper" class="org.springframework.integration.store.MessageGroupStoreReaper"> <property name="messageGroupStore" ref="exampleMessageStore" /> <!-- <property name="timeout" value="5000" />--> </bean> <task:scheduler id="exampleScheduler" /> <task:scheduled-tasks scheduler="exampleScheduler"> <task:scheduled ref="exampleReaper" method="run" fixed-rate="1000" /> </task:scheduled-tasks> <si:channel id="aggregator-output-channel"> <si:queue capacity="1000" /> </si:channel> <si:service-activator input-channel="aggregator-output-channel" ref="printerService" method="printString"></si:service-activator> <bean id="printerService" class="com.sample.agg.PrinterService"></bean> <si:poller max-messages-per-poll="1" id="defaultPoller" default="true"> <si:interval-trigger interval="1000" /> </si:poller> </beans>
the main application from where I am sending the messages are shown below....
Code:package com.sample.agg; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.Message; import org.springframework.integration.support.MessageBuilder; public class TestApp { public static void main(String[] args) throws InterruptedException{ //ApplicationContext context = new ClassPathXmlApplicationContext("resources/config.xml"); ApplicationContext context = new ClassPathXmlApplicationContext("config.xml"); Gateway gate=(Gateway) context.getBean("gateway"); for(int i=0;i<6;i++){ Message<String> msg=MessageBuilder.withPayload("My Sample Message:"+i).build(); Thread.sleep(1000); gate.send(msg); } } }
the correlation bean is shown below...
the release strategy bean is shown below...Code:package com.sample.agg; import org.springframework.integration.Message; public class ExampleCorrelationBean { //Not grouping based on correlation ids public String correlationStrategy(Message<String> request){ return "NO GROUPING"; } }
I am also sending the zip of the application , please advise me why it is not releasing those 6 messagesCode:package com.sample.agg; import java.util.List; import org.springframework.integration.Message; public class ExampleReleaseStrategy { //Releases when message count reaches 10 public boolean releaseStrategy(List<Message<String>> requests) { if(requests.size()==10){ System.out.println("Message count reached 10, so released for aggregation"); return true; } return false; } }..!!


..!!
Reply With Quote

