Page 1 of 4 123 ... LastLast
Results 1 to 10 of 39

Thread: Async gateway reply via callback

  1. #1

    Default Async gateway reply via callback

    I'm starting out with Spring Integration 2.0 M5. I have a gateway which needs to work asynchronously. I see that an async gateway function has been added in v2 but it uses a Future for the result.

    I would like to have an async callback for the reply rather than polling a Future. I hacked this by adding the async callback to the Message headers, and then having the gateway implementation return a Future instance that overrides FutureTask.done() to delegate the reply handling to the callback obtained from the headers.

    An alternate approach I thought might work was to define my gateway with default-reply-channel="replyChannel", and then also define a service activator for this channel to run the callback for the reply:

    <si:gateway id="requestGateway"
    service-interface="foo.RequestGateway"
    default-request-channel="requestChannel"/>
    default-reply-channel="replyChannel"/>

    <!-- Callback for replies -->
    <si:channel id="replyChannel"/>

    <si:service-activator input-channel="replyChannel" ref="requestGateway" method="handleReply"/>

    That would have somewhat elegantly [1] solved the problem at hand however the "replyChannel" defined in si:gateway was not the same as the "replyChannel" triggering the service-activator for the callback, so this did not work.

    Is there a better way to do this? If not, should there be?

    Cheers,
    Raman

    [1] Though there was still the problem of the gateway interface having a return value that would have to be ignored by the caller.

  2. #2
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,854

    Default

    You could just have a void-returning method in the gateway interface, and then have that service-activator at the end of the unidirectional message flow. Or, does that solution miss something?

  3. #3

    Default

    Quote Originally Posted by Mark Fisher View Post
    You could just have a void-returning method in the gateway interface, and then have that service-activator at the end of the unidirectional message flow. Or, does that solution miss something?
    I'm not sure if I understand your solution correctly. If the method return is void, then I guess I would have to create a second gateway for the replies and inject that into my request implementation. Which I think would work if there was only one replyChannel but there is more than one -- I didn't specify in the original message but the reply needs to be placed on the replyChannel in the request message header, since there may be more than one request originator (each in separate OSGi bundles). Therefore there is one replyChannel per bundle / request originator.

    In the approach I gave I was hoping that since each replyChannel was defined in a separate OSGi bundle, and each gateway would be mapped to the local bundle's replyChannel, the replies would come back to the correct bundle.

  4. #4
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    I am pretty sure your original solution will work if you make one little change.
    Whenever you define a non-void gateway method, GatewayproxyFactoryBean will create a reply-channel for it automatically and will set a '$replyChannel' header. If you also explicitly define a 'default-reply-channel', this channel will not override the auto-created channel, instead it is internally bridged to that channel. The fact that in your configuration the <si:channel id="replyChannel"/> is a DirectChannel makes it a bit confusing since you essentially registering 2 consumers to that reply channel (Gateway itself and service-activator). So what you need to do is:
    1. Make that channel pub-sub channel.
    2. Set defalut-reply-timeout on the gateway to some short time. This would allow gateway invocation to return quickly and user will ignore the response if any. Because the reply-channel is pub-sub channel, the reply message will also be dispatched to the service-activator, thus allowing both service-activator and gateway to complete successfully.

    Give it a shot and let us know.

  5. #5
    Join Date
    Oct 2005
    Location
    Boston, MA
    Posts
    2,854

    Default

    What Oleg described should work, but I guess I was confused by this statement: "there was still the problem of the gateway interface having a return value that would have to be ignored by the caller".

    That made me think that you were really just interested in creating a unidirectional flow without expecting a reply for the gateway caller. Can you clarify?

    Thanks,
    Mark

  6. #6
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    I agree, just because its gonna work it doesn't mean it is the right thing. If Gateway invoker doesn't care about the reply and is going to ignore it anyway, then what's the point of defining a gateway method with non-void signature.

    Have void method on the gateway that send message to an Executor channel (thus ensuring near-imediate return from the gateway method so you can continue) and have service activator with 'output-channel' somewhere downstream putting a message on the output-channel whenever it is finished doing whatever it is doing. I don't even think you need a callback.

  7. #7

    Default

    I think that a) I may not have correctly outlined the problem, or b) I am simply not understanding the proposed solutions. Here is the problem stated very simply:

    OSGi Bundles A, B, C contain consumers (Ac, Bc, Cc) of a service provided by a class Zs in Bundle Z, asynchronously. Yes, service Zs returns a value to the consumers. I don't want the call in Ac, Bc, or Cc to block. Therefore, service Zs should reply to consumers asynchronously. Return value needs to be via callback rather than forcing consumers to poll a Future.

    Consumers Ac, Bc, Cc use a gateway proxy to place the service request onto a Spring Integration request channel defined in Z, and exported via osgi:service.

    Bundle A, B, C:

    Code:
      <bean id="consumer" class="foo.client.Consumer">
        <property name="requestGateway" ref="requestGateway"/>
      </bean>
    
      <!-- Requests go to the requestGateway, which puts messages onto the requestChannel (exposed by some other bundle via OSGi), and replies from the RequestHandler come back to the consumer -->
      <si:gateway id="requestGateway"
        service-interface="foo.server.RequestHandler"
        default-request-channel="requestChannel"
        default-reply-channel="replyChannel"/>
    
      <osgi:reference id="requestChannel" bean-name="requestChannel" interface="org.springframework.integration.core.SubscribableChannel"/>
    Bundle Z:

    Code:
      <si:channel id="requestChannel">
        <si:dispatcher task-executor="requestExecutor"/>
      </si:channel>
    
      <task:executor id="requestExecutor" pool-size="10"/>
    
      <si:service-activator input-channel="requestChannel" ref="requestHandlerImpl"/>
    
      <bean id="requestHandlerImpl" class="foo.server.RequestHandlerImpl"/>
    
      <osgi:service interface="org.springframework.integration.core.SubscribableChannel" ref="requestChannel"/>
    Now this all seems to work if I'm willing to accept either a) blocking in the consumer (in this case the foo.RequestHandler interface gateway method returns a simple value), or b) Future polling in the consumer (in this case the foo.RequestHandler gateway method returns a Future implementation, or c) by having the service return a custom FutureTask that executes the callback in done(), and the callback is itself passed as a message header in the request.

    Solutions a) and b) work but are no good because I don't want blocking or polling, and c) does what I need but is messy. What I am looking for is the "right" way to do this using Spring Integration.

    I hope that clarifies things...

  8. #8

    Default

    Quote Originally Posted by oleg.zhurakousky View Post
    I am pretty sure your original solution will work if you make one little change.
    Whenever you define a non-void gateway method, GatewayproxyFactoryBean will create a reply-channel for it automatically and will set a '$replyChannel' header. If you also explicitly define a 'default-reply-channel', this channel will not override the auto-created channel, instead it is internally bridged to that channel. The fact that in your configuration the <si:channel id="replyChannel"/> is a DirectChannel makes it a bit confusing since you essentially registering 2 consumers to that reply channel (Gateway itself and service-activator). So what you need to do is:
    1. Make that channel pub-sub channel.
    2. Set defalut-reply-timeout on the gateway to some short time. This would allow gateway invocation to return quickly and user will ignore the response if any. Because the reply-channel is pub-sub channel, the reply message will also be dispatched to the service-activator, thus allowing both service-activator and gateway to complete successfully.

    Give it a shot and let us know.


    Hi Oleg,

    Wouldn't it take more time I think even after using the task executor on the input channel it will work like a sequential calls, thus increasing the execution time.

    Here is what I want to say
    Code:
    <si:gateway id="requestGateway" service-interface="foo.RequestGateway"
         default-request-channel="requestChannel"/>
         default-reply-channel="replyChannel"/>
    <!-- The return type of the method in gateway would of non void type say for eg : Object -->
    
    <si:channel id="requestChannel">
    <si:dispatcher load-balancer="round-robin" failover="true"
    			task-executor="poolOne" /> 
    </si:channel>
    
    <si:channel id="replyChannel">
      <si:queue capacity="10" /> 
    </si:channel>  
    
    <si:service-activator input-channel="requestChannel" ref="requestSA" method="handleReply" output-channel="replyChannel" />
    now in this case suppose I run a for loop for 10 request & if for each request
    Code:
    GatewayInterface gateway = (GatewayInterface) context.getBean("requestGateway");
    for (int i = 0; i < 10; i++) {
      TestDVO testDVO = new TestDVO(i);
      System.out.println("Received =>"+ gateway.placeSIRequest(testDVO));
    }
    SA is taking 10 secs then the total execution time will always be 100secs+

    I don't understand why it is taking 100 secs. shouldn't it process all the request parallely.

  9. #9
    Join Date
    Jan 2008
    Location
    Mohnton, PA USA (that's near Philadelphia)
    Posts
    2,148

    Default

    You configuration looks correct, I just tested it. May be because your sleep in the service-activator is so small (.01 millisecond), that between printing the log and sleeping you don't really see the difference.
    Just for an experiment, try to make your service deliberately slower (increase the sleep time) and you'll see the diff.
    Here is what I have just tested.
    Gateway:
    Code:
    public void execute(String request);
    ServiceActivator
    Code:
    public String echo(String value) throws Exception{
    	Thread.sleep(1000);
    	System.out.println("request: " + value);
    	return value;
    }
    Test
    Code:
    for (int i = 0; i < 10; i++) {
    	gateway.execute("Hello");
    	System.out.println("sent request");
    }
    Config is the same as yours and the output is:
    Code:
    . . .
    sent request
    sent request
    sent request
    request: Hello
    request: Hello
    request: Hello
    . . .
    As you can see requests are first sent and the requesting loop is over . . . way before service-activator gets a chance to execute even the first one.
    Also, notice how my Gateway return void even though service-activator has a non-void signature

  10. #10

    Default

    Oleg in your above code how would you get the reply of the SA
    your SA is returning a String. What if I want the returned string?

    Also to let u know my delay in SA is 10 secs & not 0.1 milliseconds.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •