Results 1 to 7 of 7

Thread: Spring AMQP: would like to put message to queue and send ACK immediately

  1. #1
    Join Date
    Aug 2012
    Posts
    5

    Default Spring AMQP: would like to put message to queue and send ACK immediately

    I wrote Java application, that sends messages to RabbitMQ. Then Flume picks messages up from RabbitMQ queue. I'm interested that nobody pulls messages from the queue, except flume.

    My application uses Spring AMQP Java plugin.
    The problem:

    With the code below, message comes to RabbitMQ queue and stays 'Unknowledges' for ever. As I understand, RabbitMQ is waiting for ACK from MessageListener, but MessageListener will never ACK. Does anybody have idea how to fix it?

    The code:

    public class MyAmqpConfiguration {

    @Autowired
    ConnectionFactory connectionFactory;

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(activityLogsQueue());
    container.setMessageListener(MyMessageListener());
    container.setConcurrentConsumers(3);

    return container;
    }

    @Bean(name="myTemplate")
    public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(MyMessageConverter()) ;
    return template;
    }
    }


    public class MyMessageListener implements MessageListener {


    public MyMessageListener(MessageConverter converter, MyMessageHandler<MyObject> messageHandler) {
    this.converter = converter;
    this.messageHandler = messageHandler;
    }

    @Override
    public void onMessage(Message message) {
    this.messageHandler.doThings();
    }

    }

    public class MyMessageHandler {

    @Autowired
    @Qualifier("myTemplate")
    RabbitTemplate template;

    @Override
    public void handleMessage(MyObject thing) {
    template.convertAndSend(exchange, routingKey, thing);
    }

    }


    public class MyMessageConverter extends JsonMessageConverter {

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
    //do things
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
    throw new UnsupportedOperationException("fromMessage is not supported in "+this.getClass().getName());
    }


    }

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    Your question is not clear.

    If Flume is the consumer, why do you have a listener?

    Senders don't ack messages, consumers do; the ack is the responsibility of the consumer.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Aug 2012
    Posts
    5

    Default

    Hi,

    Thank you for the answer.
    Actually, Flume is consumer.
    Then, should I just leave template without listener?

    Quote Originally Posted by Gary Russell View Post
    Your question is not clear.

    If Flume is the consumer, why do you have a listener?

    Senders don't ack messages, consumers do; the ack is the responsibility of the consumer.

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    Yes; if you only want to send; you only need a template, but I am interested in why you thought you needed a listener in the first place.

    However, this does mean your problem (missing acks) is a problem with your Flume configuration because, like I said, it's the consumer that acks.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Aug 2012
    Posts
    5

    Default

    When I create template without MessageListener, I get following Exception:

    WARN org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: java.lang.IllegalStateException: No message listener specified - see property 'messageListener'

    Quote Originally Posted by Gary Russell View Post
    Yes; if you only want to send; you only need a template, but I am interested in why you thought you needed a listener in the first place.

    However, this does mean your problem (missing acks) is a problem with your Flume configuration because, like I said, it's the consumer that acks.

  6. #6
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,017

    Default

    Well, you'll get that if you define a listener Container and no listener, but you don't need a ListenerContainer.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  7. #7
    Join Date
    Aug 2012
    Posts
    5

    Default

    Perfect!
    I removed unnesessary ListeneConteiner, and it fixed the problem with 'unuknowledged' messages!!!
    Thank you!

    Quote Originally Posted by Gary Russell View Post
    Well, you'll get that if you define a listener Container and no listener, but you don't need a ListenerContainer.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •