Results 1 to 3 of 3

Thread: SI and BlazeDS failure when no consumers (blazeds sessions)

  1. #1
    Join Date
    Feb 2006
    Location
    Los Angeles, CA
    Posts
    79

    Default SI and BlazeDS failure when no consumers (blazeds sessions)

    Hello. I'm trying to build a solution whereby I have a Spring Integration process that processes files from the file system and then sends them through the pipleline. Eventually they get shoved through JMS and come out the other end via a message driven JMS adapter, out on the channel and straight to the destination configured by <flex:integration-message-destination ... /> this works fine IF there is somebody logged in. It blows chunks if I restart the server and drop a file, because (reasonably so) the message driven adapter complains that there are no subscribers when it tries to send the message out. Which is true. But there's hardly anything I can do about that. I know of a few ways to work around this. I can choose ignore-failures on the dispatcher, but that's missing the point, it isn't a failure, per se. I could simply create a component before the final channel and eat the success/failure of the send operation by injecting the channel and sending the message manually. Thus, as far as Spring integration is concerned the processing dead ends right before the flex destination. I can afford to ignore these messages as theyre not document messages, but event messages - they server only to refresh the UI thats already loaded in a given clients screen. If nobodys logged on, then theres no screen to refresh. Next time they login theyll get a fresh view of the data, no matter what.

    One approach i thought about was using the MessageInterceptors on the channel and veto-ing the send if there are no subcribers. It's just that I don't know how to ascertain that there are no subscribers from inside a ChannelInterceptor.

    Any insight into this would be appreciated. I wasn't sure if I should post this on the Spring Integration forum or on the BlazeDS forum since it's definitely a problem unique to BlazeDS in conjunction with SI...

    Thanks,
    Josh Long
    Spring Developer Advocate
    SpringSource, a division of VMware

    http://blog.SpringSource.org
    http://www.joshlong.com

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,840

    Default

    If the messages are really event messages, I would typically recommend using a "publish-subscribe-channel" instead. That would make it easy to add additional consumers for one thing, but it would also avoid the dispatcher failure.

    I've considered this general issue a bit, and I think there are two things we could do within Spring Integration: 1) add a boolean attribute to the 'channel' element ('require-subscribers' or something) or 2) support a header within the message itself. The latter option would allow for explicit labeling of "event" messages.

    Let me know if there is some reason you *don't* want to use "publish-subscribe-channel", and if you have a preference for the 2 options I mentioned above, let me know that as well.

    Thanks,
    Mark

  3. #3
    Join Date
    Feb 2006
    Location
    Los Angeles, CA
    Posts
    79

    Default

    I did try the publish-subscribe option (thinking that it should work as you describe.) Apologies for not having mentioned this...

    here's my integration (roughly):

    Code:
     
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns:integration="http://www.springframework.org/schema/integration"
    	xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
    	xmlns:util="http://www.springframework.org/schema/util" xmlns:tool="http://www.springframework.org/schema/tool"
    	xmlns:lang="http://www.springframework.org/schema/lang" xmlns:jms="http://www.springframework.org/schema/integration/jms"
        xmlns:file="http://www.springframework.org/schema/integration/file"
    	xmlns:amq="http://activemq.apache.org/schema/core" xmlns:flex="http://www.springframework.org/schema/flex"
    	xsi:schemaLocation="
        	http://www.springframework.org/schema/util
        	http://www.springframework.org/schem...g-util-3.0.xsd
    		http://www.springframework.org/schema/tool
    		http://www.springframework.org/schem...g-tool-3.0.xsd
    		http://www.springframework.org/schema/lang
    		http://www.springframework.org/schem...g-lang-3.0.xsd
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schem...-beans-3.0.xsd
            http://www.springframework.org/schema/context
    		http://www.springframework.org/schem...ontext-3.0.xsd
            http://www.springframework.org/schema/integration
    		http://www.springframework.org/schem...ration-1.0.xsd
            http://www.springframework.org/schema/integration/file
    		http://www.springframework.org/schem...n-file-1.0.xsd
    		http://www.springframework.org/schema/integration/jms
    		http://www.springframework.org/schem...on-jms-1.0.xsd
    		http://activemq.apache.org/schema/core
    		http://activemq.apache.org/schema/co...core-5.3.0.xsd
    		http://www.springframework.org/schema/flex
            http://www.springframework.org/schem...g-flex-1.0.xsd
            ">
    
    
        <!-- general Spring stuff -->
    	<context:annotation-config />
    	<context:component-scan base-package="c.a.swr.flex.auction" />
    
    
        <!-- bus stuff -->
    	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="connectionFactory" />
    	<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory" p:sessionCacheSize="10" p:cacheProducers="false">
    		<property name="targetConnectionFactory">
    			<amq:connectionFactory   brokerURL="tcp://localhost:61616" />
    		</property>
    	</bean>
    
        <amq:topic id="itemPosted" name="itemPosted" physicalName="itemPosted" />
        <amq:topic id="bidPosted" name="bidPosted" physicalName="bidPosted" />
        <amq:topic id="bidAccepted" name="bidAccepted" physicalName="bidAccepted" />
    
        <bean id="itemFilesMount" class="org.springframework.core.io.FileSystemResource"  >
            <constructor-arg value="#{ systemProperties['user.home']+'/flexCsvFiles' }"/>
        </bean>
    
        <integration:channel id="inboundItemFiles"/>
        <integration:channel id="inboundItemFileStrings"/>
        <integration:channel id="inboundItems"/>
        <integration:publish-subscribe-channel  id="inboundItemsPosted" />
    
    
        <file:inbound-channel-adapter auto-create-directory="true" directory="#{itemFilesMount.file.absolutePath}" channel="inboundItemFiles">
            <integration:poller> <integration:interval-trigger interval="10000"/> </integration:poller>
        </file:inbound-channel-adapter>
        <file:file-to-string-transformer input-channel="inboundItemFiles" output-channel="inboundItemFileStrings" delete-files="true" />
        <integration:transformer input-channel="inboundItemFileStrings" ref="fileToItemTransformer" output-channel="inboundItems"/>
        <integration:service-activator input-channel="inboundItems" ref="itemCreationServiceActivator"/>
        <jms:message-driven-channel-adapter  connection-factory="connectionFactory" destination="itemPosted" channel="inboundItemsPosted" />
    
    	<flex:message-broker services-config-path="/WEB-INF/flex/services-config.xml">
    		<flex:message-service default-channels="my-amf" />
    	</flex:message-broker>
    
        <flex:integration-message-destination
                channels="my-amf"
                id="itemPostedDestination"
                message-channel="inboundItemsPosted"            
                />
        
       
       
    </beans>
    Note the channel named 'inboundItemsPosted'. It's what passes the inbound messages from the JMS message driven channel adapter and passes it on to the ntegration-message-destination. I debugged this and got that inside of BroadcastingDispatcher, iin the dispatch(Message<?> message) method we have the following logic:

    Code:
    public boolean dispatch(Message<?> message) {
    		boolean dispatched = false;
    		int sequenceNumber = 1;
    		List<MessageHandler> handlers = this.getHandlers();
    		int sequenceSize = handlers.size();
    		for (final MessageHandler handler : handlers) {
    			final Message<?> messageToSend = (!this.applySequence) ? message
    				: MessageBuilder.fromMessage(message)
    						.setSequenceNumber(sequenceNumber++)
    						.setSequenceSize(sequenceSize)
    						.setCorrelationId(message.getHeaders().getId())
    						.setHeader(MessageHeaders.ID, UUID.randomUUID())
    						.build();
    			if (this.taskExecutor != null) {
    				this.taskExecutor.execute(new Runnable() {
    					public void run() {
    						invokeHandler(handler, messageToSend);
    					}
    				});
    				dispatched = true;
    			}
    			else {
    				boolean success = this.invokeHandler(handler, messageToSend);
    				dispatched = (success || dispatched);
    			}
    		}
    		return dispatched;
    	}
    As there are no handleers in the handlers collection we never get a chance to set dispatched = true, which in turn is returned from the method as the success status variable. this value (the boolean) is then returned from protected boolean doSend(Message<?> msg, long timeout) in AbstractSubscribableChannel, which in turn is invoked by public final boolean send (Message<?> msg, long timeout) in AbstractMessageChannel, which in turn returns the boolean as whether or not the message was sent from the other variant of the send(Message<?> msg) method. Anyway, this bubbles up all the way back to the MessageChannelTemplate's doSend method.

    So, I understand why this works that way, normally. Most integrations are dealing with static components that don't subscribe/disconnect as frequently as users on a web app... so I'm not recommending we change that fundamental behavior. Instead, I like the idea of that opt-in configuration option at the higher level that you suggested. I am fundamentally unsure of which option would be better, too. Functionality exposed as a message header is definitely the most flexible since it means it could be dynamic / arbitrary and still be an opt-in behavior.

    Anyway, all input's greatly appreciated.

    Thanks,
    Josh Long
    Spring Developer Advocate
    SpringSource, a division of VMware

    http://blog.SpringSource.org
    http://www.joshlong.com

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •