Results 1 to 2 of 2

Thread: Overwriting aggregateHeaders

  1. #1
    Join Date
    Mar 2012
    Posts
    1

    Default Overwriting aggregateHeaders

    I'm using a pretty basic aggregator (spring 2.1) with all the default implementations and it works just fine: aggregates messages and releases the groups when completed. The only change I need is to add some headers with customized names and those names come from the aggregated payloads. Here is my current aggregator definition:
    Code:
    	<int:aggregator id="filesAggregator" 
    			auto-startup="true" 
    			input-channel="inChannel" 
    			output-channel="outChannel" 
    			discard-channel="expiredChannel" 
    			message-store="filesAggregatorMessageStore"
    			send-partial-result-on-expiry="false" 
    			expire-groups-upon-completion="true" /> 
    
    	<bean id="filesAggregatorMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
    
    	<bean id="filesAggregatorReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
    		<property name="messageGroupStore" ref="filesAggregatorMessageStore"/>
    		<property name="timeout" value="30000"/>
    	</bean >
    	
    	<task:scheduler id="reapersScheduler" pool-size="1"/>	
    	<task:scheduled-tasks scheduler="reapersScheduler">	
    	    <task:scheduled ref="filesAggregatorReaper" method="run" fixed-rate="5000"/>
    	</task:scheduled-tasks>
    
            <task:executor id="expiredFilesChannelExecutor" pool-size="2"/>
    I've searched the API and documentation and came across the method aggregateHeaders from AbstractAggregatingMessageGroupProcessor. Well, after so many tryes and no success I came to the forum. How can I keep using all the defaults and overwrite aggregateHeaders?

    Regards,

    Ricardo.

  2. #2
    Join Date
    Jan 2009
    Location
    Ukraine, Kharkov
    Posts
    645

    Default

    Hello
    So, what I can propose here it's regular bean definitions:
    HTML Code:
    <bean class="org.springframework.integration.config.ConsumerEndpointFactoryBean">
    		<property name="inputChannel" ref=""/>
    		<property name="pollerMetadata" ref=""/>
    		<property name="handler">
    			<bean class="org.springframework.integration.aggregator.AggregatingMessageHandler">
    				<constructor-arg name="processor" ref="aggregateHeadersDefaultAggregatingMessageGroupProcessorBean"/>
    				<property name="correlationStrategy" ref=""/>
    				<property name="discardChannel" ref=""/>
    				<property name="expireGroupsUponCompletion" value=""/>
    				<property name="messageStore" ref=""/>
    				<property name="outputChannel" ref=""/>
    			</bean>
    		</property>
    	</bean>
    your aggregateHeadersDefaultAggregatingMessageGroupProc essorBean should be somethis like this:
    Code:
    public class AggregateHeadersDefaultAggregatingMessageGroupProcessor extends DefaultAggregatingMessageGroupProcessor {
       protected Map<String, Object> aggregateHeaders(MessageGroup group) {
            // your logic
       }
    }
    Hope, this help,
    Artem Bilan

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •