Results 1 to 6 of 6

Thread: Message properties contains java.io.DataInputStream

Hybrid View

  1. #1
    Join Date
    Dec 2012
    Posts
    12

    Default Message properties contains java.io.DataInputStream

    RabbitMQ version 3
    org.springframework.amqp 1.0.0.RELEASE


    Hi, I am new to spring and amqp and I have an error in a project.

    First here is the code that puts the value in the org.springframework.amqp.core.MessageProperties

    Code:
     public final void sendClientUpdates(final String queueName, final String textMessage,
                                            final Object updateIds, final String replyUrl) {
            MessageProperties messageProperties = new MessageProperties();
            ByteArrayOutputStream output = new ByteArrayOutputStream();
            XMLEncoder encoder = new XMLEncoder(output);
            encoder.writeObject(updateIds);
            try {
                encoder.close();
                output.close();
            } catch (IOException ex) {
                logger.warn("", ex);
            }
            messageProperties.setHeader(UPDATE_ID_PROPERTY, output.toString());

    When reading the message from the queue

    Code:
    public void onMessage(final Message message) {
            Map<String, Object> headers = message.getMessageProperties().getHeaders();
            XMLDecoder decoder = new XMLDecoder(
                        new ByteArrayInputStream(((String) headers.get(UPDATE_ID_PROPERTY)).getBytes()));
    I get a
    java.lang.ClassCastException: java.io.DataInputStream cannot be cast to java.lang.String

    Suddenly UPDATE_ID_PROPERTY is a java.io.DataInputStream.

    This does not happen all the time... but very often.

    The UPDATE_ID_PROPERTY would normally look like this

    UPDATE_ID_PROPERTY : <?xml version="1.0" encoding="UTF-8"?>
    <java version="1.7.0_13" class="java.beans.XMLDecoder">
    <object class="java.util.ArrayList">
    <void method="add">
    <long>162</long>
    </void>
    </object>
    </java>

  2. #2
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,016

    Default

    Please provide a full stack trace for the exception as well as a DEBUG log that includes the message delivery (and exception) if possible.

    Also, spring-amqp 1.0.0 is very old; please update to 1.1.4.RELEASE.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  3. #3
    Join Date
    Dec 2012
    Posts
    12

    Default

    The error is in our live environment.. but so far I haven't been able to reproduce it a test environment.
    Here is the full stacktrace

    [ERROR] 2013-03-08 16:10:36,972 [com.farheap.jsi.mdp.MessageListenerJSI3] - Unknown error while sending updates from message (Body:'[B@54e0f629(byte[12156])'; ID:null; Content:application/octet-stream; Headers:{ENDPOINT_URL=https://xxxxx/php/classes/JSI3Receiver.class.php, UPDATE_ID=java.io.DataInputStream@7d412947}; Exchange:; RoutingKey:jsi.to.external; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:118)
    java.lang.ClassCastException: java.io.DataInputStream cannot be cast to java.lang.String
    ********at com.farheap.jsi.mdp.MessageListenerJSI3.onMessage( MessageListenerJSI3.java:74) ~[jsi-webservice-3.jar:na]
    ********at com.farheap.jsi.mdp.MessageListenerJSI3$$FastClass ByCGLIB$$91448113.invoke(<generated>) [cglib-nodep-2.1_3.jar:na]
    ********at net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy. java:149) [cglib-nodep-2.1_3.jar:na]
    ********at org.springframework.aop.framework.Cglib2AopProxy$C glibMethodInvocation.invokeJoinpoint(Cglib2AopProx y.java:688) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :150) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at org.springframework.transaction.interceptor.Transa ctionInterceptor.invoke(TransactionInterceptor.jav a:110) [spring-tx-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :161) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at org.springframework.transaction.interceptor.Transa ctionInterceptor.invoke(TransactionInterceptor.jav a:110) [spring-tx-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :161) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at org.springframework.aop.interceptor.ExposeInvocati onInterceptor.invoke(ExposeInvocationInterceptor.j ava:89) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at org.springframework.aop.framework.ReflectiveMethod Invocation.proceed(ReflectiveMethodInvocation.java :172) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at org.springframework.aop.framework.Cglib2AopProxy$D ynamicAdvisedInterceptor.intercept(Cglib2AopProxy. java:621) [spring-aop-3.0.5.RELEASE.jar:3.0.5.RELEASE]
    ********at com.farheap.jsi.mdp.MessageListenerJSI3$$EnhancerB yCGLIB$$63a53782.onMessage(<generated>) [cglib-nodep-2.1_3.jar:na]
    ********at org.springframework.amqp.rabbit.listener.adapter.M essageListenerAdapter.onMessage(MessageListenerAda pter.java:328) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.doInvokeListener(AbstractM essageListenerContainer.java:506) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.invokeListener(AbstractMes sageListenerContainer.java:470) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$001(SimpleMessageList enerContainer.java:56) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$1.invokeListener(SimpleMessa geListenerContainer.java:103) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.invokeListener(SimpleMessage ListenerContainer.java:560) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.AbstractM essageListenerContainer.executeListener(AbstractMe ssageListenerContainer.java:452) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.doReceiveAndExecute(SimpleMe ssageListenerContainer.java:436) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.receiveAndExecute(SimpleMess ageListenerContainer.java:420) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer.access$200(SimpleMessageList enerContainer.java:56) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at org.springframework.amqp.rabbit.listener.SimpleMes sageListenerContainer$AsyncMessageProcessingConsum er.run(SimpleMessageListenerContainer.java:505) [spring-rabbit-1.0.0.RELEASE.jar:na]
    ********at java.lang.Thread.run(Thread.java:662) [na:1.6.0_26]

  4. #4
    Join Date
    Mar 2010
    Location
    Gtr Philadelphia, PA
    Posts
    2,016

    Default

    After some investigation, I see the MessagePropertiesConverter has a threshold for the size of the property...

    Code:
    /**
     * Converts a LongString value to either a String or DataInputStream based on a length-driven threshold. If the
     * length is 1024 bytes or less, a String will be returned, otherwise a DataInputStream is returned.
     */
    private Object convertLongString(LongString longString, String charset) {
    	try {
    		if (longString.length() <= 1024) {
    			return new String(longString.getBytes(), charset);
    		} else {
    			return longString.getStream();
    		}
    	} catch (Exception e) {
    		throw RabbitUtils.convertRabbitAccessException(e);
    	}
    }
    So, a quick fix would be detecting the type in your listener and calling getBytes() on the DataInputStream.

    The threshold (1024) perhaps should be configurable; if you feel strongly please open up an 'Improvement' JIRA issue.

    We should at least document this in the reference manual.
    Last edited by Gary Russell; Mar 9th, 2013 at 05:13 PM.
    Gary P. Russell
    Spring Integration Team
    SpringSource, a division of VMware

  5. #5
    Join Date
    Dec 2012
    Posts
    12

    Default

    Updating to 1.1.4 didn't help.

    I fixed the problem by doing this :

    MessageProperties messageProperties = new MessageProperties();
    StringBuilder builder = new StringBuilder();
    for (Iterator<Long> iterator = updateIds.iterator(); iterator.hasNext() {
    Long long1 = (Long) iterator.next();
    builder.append(long1).append(";");
    }
    messageProperties.setHeader(MessageListenerJSI3.UP DATE_ID_PROPERTY, builder.toString());
    and

    Map<String, Object> headers = message.getMessageProperties().getHeaders();

    try {
    String sEndpointUrl = (String) headers.get(ENDPOINT_URL_PROPERTY);
    updateIds = new ArrayList<Long>();
    String idsStr = (String) headers.get(UPDATE_ID_PROPERTY);
    StringTokenizer stringTokenizer = new StringTokenizer(idsStr, ";");
    while(stringTokenizer.hasMoreTokens()){
    String nextElement = (String) stringTokenizer.nextElement();
    updateIds.add(Long.parseLong(nextElement));
    }

  6. #6
    Join Date
    Dec 2012
    Posts
    12

    Default

    Oh I see Gary.. that's the reason..

    So my code fix will actually not really fix the problem. It could still happen.

    I made this change to handle DataInputStream

    Map<String, Object> headers = message.getMessageProperties().getHeaders();
    String idsStr = "";
    try {
    String sEndpointUrl = (String) headers.get(ENDPOINT_URL_PROPERTY);
    updateIds = new ArrayList<Long>();
    Object object = headers.get(UPDATE_ID_PROPERTY);
    if (object instanceof DataInputStream) {
    DataInputStream dataInputStream = (DataInputStream) object;
    idsStr = IOUtils.toString(dataInputStream);
    } else {
    idsStr = (String) headers.get(UPDATE_ID_PROPERTY);
    }
    Last edited by klind; Mar 9th, 2013 at 06:13 PM.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •