Page 1 of 2 12 LastLast
Results 1 to 10 of 18

Thread: programmatic RabbitTemplate configuration

  1. #1
    Join Date
    Oct 2010
    Posts
    21

    Default programmatic RabbitTemplate configuration

    hi gary:
    what would be the java programmatic way to do the following spring-context:
    <rabbit:template id="amqpTemplate"
    connection-factory="connectionFactory" reply-queue="replies">
    <rabbit:reply-listener />
    </rabbit:template>

    is it enough to do the following:
    //Queue queueX
    RabbitTemplate template = new RabbitTemplate(connFactory);
    //queueX is declared using rabbit admin?
    template.setReplyQueue(queueX);

    is there anything to be done for setting a reply-listener as in the spring-context config?

    regards,
    -cogitate

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    It's quite a lot more involved than that; you need to create and configure a SimpleMessageListenerContainer, and set the RabbitTemplate as its MessageListener.

    Probably the best place to start is to look at the TemplateParser.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Oct 2010
    Posts
    21

    Default

    thanks gary! will do.
    in the meanwhile if i were to use the template as a prototype bean for configuring a template or maybe use @Configuration is there a way i can do the same thing?
    1. would like to configure template with a reply-queue
    2. would like to use connections configured with "N" channels (max)
    3. and of course, use a reply-listener...

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    Hi Monish,

    Here's how to programmatically configure a RabbitTemplate to use a fixed reply queue with a reply listener container...

    Code:
    SimpleMessageListenerContainer replyContainer = new SimpleMessageListenerContainer(connectionFactory);
    replyContainer.setQueueNames("reply.queue");
    
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setReplyQueue(new Queue("reply.queue"));
    
    replyContainer.setMessageListener(template);
    replyContainer.afterPropertiesSet();
    replyContainer.start();
    
    
    
    Object reply = template.convertSendAndReceive("test.exchange", "test.binding", "Hello, world!");
    Hope that helps.

    There is currently no way to limit the number of channels; the number of cached channels can be controlled on the CachingConnectionFactory.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Oct 2010
    Posts
    21

    Default

    thanks Gary!
    this goes a long way in improving what i have today.
    many many thanks,
    -monish

  6. #6
    Join Date
    Oct 2010
    Posts
    21

    Default

    Hi Gary:
    sorry to bother you, but i have one more question. if i'd want to have control over #of connections how would i do that?
    let's say configuration params from user is :
    [1] # of connections = 2
    [2] # of channels(threads/reply_queues) = 6 ( executorservice threads per connection )
    for a total concurrency of 12. is it possible to specify this?

    kind regards,
    -cogitate

  7. #7
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    No; each connection factory uses a single connection for all channels.

    If you want to use a separate connection for different components, specify a connection factory for each.

    There is no way to limit channels per connection; that is entirely controlled by threading. Channels are cached so the actual number of channels is often (much) less than the number of threads.

    For the listener container, one channel is used by each consumer (controlled by the concurrentConsumers property). For the RabbitTemplate, channels are retrieved from the cache or created as necessary. If three threads are calling the template concurrently, three channels will be used. By default, only one channel is cached; this is controlled by the channelCacheSize property. When a thread "closes" a channel, and the cache isn't full, the channel is returned to the cache. You might have 50 threads using the template, but if only 3 of them are actually using the template at the same time, only 3 channels will be used.

    I hope that explains.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  8. #8
    Join Date
    Oct 2010
    Posts
    21

    Default

    Thank you much for this explanation.
    -- as a feedback to you i tested several scenarios:
    [1] multithreaded test with different timeouts to see if there are cross correlation mismatch ( passes great )
    [2] disconnect tests [client is up, restart broker]( yet to verify all tests, but from what i can see works!)
    -- one thing which might be useful is to copy "spring_reply_correlation" key back to the message from rabbitTemplate.sendAndReceive().it seems the defaultMessagePropertiesConverter currently doesn't copy this over.
    it might be argued that it's transparent to the client (for ex. even to start with), but when i see the SimpleAsyncTaskExecutor return with a spring_reply_correlation# it's difficult to know what message to correlate that with (for ex. timeout scenarios)

    much thanks,
    -monish

  9. #9
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,040

    Default

    Hi Monish,

    We do have an open JIRA to support using the standard Rabbit CorrelationId property instead of this custom Spring property (https://jira.springsource.org/browse/AMQP-271). That is mapped by the default converter.

    This will likely become the standard in 1.2 (while still supporting the Spring property for backward compatibility).
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  10. #10
    Join Date
    Oct 2010
    Posts
    21

    Default

    thanks Gary! i do see rabbit correlationid gets mapped by the default converter.

    VIP testing using two brokers:
    i use rabbitTemplate and simpleMessageListenerContainer(listener) like this:
    * create a pool of rabbitTemplate clients that aggregates the listener, pre-configured to use a particular reply-queue ( to save on creation/deletion of reply queues).
    * when a particular broker behind the VIP is shutdown, the listener manages to connect to the other broker thru the VIP, however, since the amqp (rabbitmq) queue is created outside the listener and since the idea is to re-use the queue, there's no feedback to re-create the queue in the new connection.

    is there any api in the listener container i can make use of to get notified that i need to re-declare the queue?

    regards,
    -monish

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •