Results 1 to 10 of 10

Thread: sending a message on RabbitMQ queue via SMTP with Spring Integration

Threaded View

  1. #1
    Join Date
    Sep 2012
    Posts
    19

    Default sending a message on RabbitMQ queue via SMTP with Spring Integration

    Hi, I am relatively new to Rabbit MQ and Spring integration, but have been able to successfully configure a program using the Integration Samples for Mail and AMQP, based on the advice in this thread that checks a POP/SSL mailbox periodically and puts any messages on a Rabbit MQ queue via AMQP. Kudos to the Spring developers who have made development of this type of integration straightforward.

    Conversely, I would like to have a separate program that checks the contents of a queue via AMQP and submits the message to a transformer class, which in turn constructs an email from the original RMQ message payload and sends it to a recipient via SMTP (communicating with the server over SSL).

    I am not sure about how the XML should look to move the data from Rabbit to the transform class. I read a good example of how to get started on the SMTP side but the input was a file instead of Rabbit.

    So far I have this:
    amqpsmtp-config.xml
    Code:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xmlns:int="http://www.springframework.org/schema/integration"
    	xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    	xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
            xmlns:util="http://www.springframework.org/schema/util"
    	xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
    		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    		http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd	
    		http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
    		http://www.springframework.org/schema/integration/mail http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
    		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
    
    
        <int-amqp:inbound-channel-adapter id="ackqueue"
    				      channel="fromRabbit" 
                                          queue-names="immunization.acknowledge.queue"
                                          connection-factory="connectionFactory"/>
    
       <int-amqp:inbound-channel-adapter  id="ackerror"
    				      channel="fromRabbit" 
                                          queue-names="immunization.error.queue"
                                          connection-factory="connectionFactory"/>
    
        <int:channel id="fromRabbit">
           <int:interceptors>
                <int:wire-tap channel="loggingChannel"/>
            </int:interceptors>
        </int:channel>
    
        <int:transformer input-channel="fromRabbit" output-channel="outgoingMail" ref="MailTransformer" method="doTransform"/>
    
        <bean id="mailSender" class="org.springframework.mail.javamail.JavaMailSenderImpl">
    		<property name="host" value="XXXXXXXXX"/>
    		<property name="username" value="XXXXXXX"/>
    		<property name="password" value="XXXXXXXX"/>
        </bean>
    
        <bean id="MailTransformer" class="com.ehrdoctors.amqp.amqpsmtp.MailTransformer">
    	<property name="mailSender" ref="mailSender"/>
        </bean>	
    
        <int:channel id="outgoingMail">
           <int:interceptors>
                <int:wire-tap channel="loggingChannel"/>
            </int:interceptors>
        </int:channel>
    
        <int-mail:outbound-channel-adapter 
    	id="outboundMailAdapter" 
    	channel="outgoingMail" 
    	mail-sender="mailSender"
    	java-mail-properties="javaMailProperties"/>
    
    <!-- Sends the mail to the mailbox with the credentials defined by mailSender bean -->
    
        <util:properties id="javaMailProperties">
    		<prop key="mail.smtp.debug">true</prop>
    		<prop key="mail.smtp.auth">true</prop>
    		<prop key="mail.smtp.port">25</prop>
    <!--SSL Parameters -->
    <!--		<prop key="mail.smtp.socketFactory.class">javax.net.ssl.SSLSocketFactory</prop>
    		<prop key="mail.smtp.socketFactory.port">465</prop> 
    		<prop key="mail.smtp.socketFactory.fallback">false</prop>
    -->
        </util:properties>
     
        <int:logging-channel-adapter id="loggingChannel" log-full-message="true" level="DEBUG"/>
    
        <!-- Infrastructure -->
    
        <rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" virtual-host="immunization" username="XXXXX" password="XXXXXX" />
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
        <rabbit:admin connection-factory="connectionFactory" />
        <rabbit:queue name="immunization.acknowledge.queue" />
        <rabbit:queue name="immunization.error.queue" />
    
        <rabbit:topic-exchange name="immunization.message.exchange">
            <rabbit:bindings>
                <rabbit:binding queue="immunization.acknowledge.queue" pattern="immunization.acknowledge.binding" />
            </rabbit:bindings> 	
        </rabbit:topic-exchange>
    
        <rabbit:topic-exchange name="immunization.message.exchange">
            <rabbit:bindings>
                <rabbit:binding queue="immunization.error.queue" pattern="immunization.error.binding" />
            </rabbit:bindings> 	
        </rabbit:topic-exchange>
    
    
    </beans>
    Mail Transformer.java
    Code:
    package com.ehrdoctors.amqp.amqpsmtp;
    
    import org.springframework.mail.javamail.MimeMailMessage;
    import org.springframework.mail.javamail.MimeMessageHelper;
    import javax.mail.internet.MimeMessage;
    import javax.xml.parsers.DocumentBuilder;
    import javax.xml.parsers.DocumentBuilderFactory;
    import org.w3c.dom.CharacterData;
    import org.w3c.dom.Document;
    import org.w3c.dom.Element;
    import org.w3c.dom.Node;
    import org.w3c.dom.NodeList;
    import org.xml.sax.InputSource;
    import javax.xml.xpath.*;
    import java.io.StringReader;
    import java.io.ByteArrayInputStream;
    import java.util.Date;
    import org.springframework.integration.Message;
    import org.springframework.mail.javamail.JavaMailSenderImpl;
    import org.apache.log4j.Logger;
    import org.springframework.integration.transformer.AbstractTransformer;
    import org.springframework.core.io.ByteArrayResource;
    import org.apache.commons.codec.binary.Base64;
     
    public class MailTransformer  extends AbstractTransformer {
    private static Logger logger = Logger.getLogger(MailTransformer.class);
    private JavaMailSenderImpl mailSender;
    public void setMailSender(JavaMailSenderImpl jms) {
    	this.mailSender = jms;
    }
    public JavaMailSenderImpl getMailSender() {
           return mailSender;
    }
    
    @Override
    protected Object doTransform(Message<?> message) throws Exception {
    			/* Create the mime message using spring.framework.mail packages */
    			MimeMessage mimeMsg = mailSender.createMimeMessage();
    			try{
    				
    				Object payload = message.getPayload();
    				DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
        				InputSource is = new InputSource();
        				is.setCharacterStream(new StringReader(payload.toString()));
    				Document doc = db.parse(is);
        				XPathFactory  factory=XPathFactory.newInstance();
    
    				//Create an XPath object from the XPathFactory object with the newXPath method.
    				XPath xPath=factory.newXPath();
    				//Extract the To, From, and subject from the Message
    				String mailFrom = xPath.evaluate("/Message/From", doc);
    				String mailTo = xPath.evaluate("/Message/To", doc);
    				String mailSubject = xPath.evaluate("/Message/Subject", doc);
    				
    				//The body is a little different, we use the text() method because the Content is in CDATA
    				String body = xPath.evaluate("/Message/Content/text()", doc);
    				
    				//We may have more than one attachment, so we need to create a NodeList fo attachments
    				//to iterate through
    				NodeList attachments = (NodeList)xPath.evaluate("/Message/attachments", doc, XPathConstants.NODESET);	
    				boolean multipart = false;
    				if(attachments!=null){multipart=true;}
    
    				MimeMessageHelper helper = new MimeMessageHelper(mimeMsg , multipart);
    				helper.setTo(mailTo);	
    				helper.setFrom(mailFrom);
    				helper.setSubject(mailSubject);
    				helper.setSentDate(new Date());
    				helper.setText(body);
    
    				
    				if(attachments!=null){
    					for (int i = 0; i < attachments.getLength(); i++) {
    				      		logger.info("Adding attachment: " + i);
    				      		Element attachment = (Element) attachments.item(i);
          				      		String strAttachment = xPath.evaluate("attachment", attachment);
    				     		String strFileName = xPath.evaluate("attachment/@filename",attachment);
    				     		logger.info("Sucess adding attachment: " + strAttachment + " with filename:" + strFileName);
    				      
    				      		if(strAttachment.equals("")==false && strFileName.equals("")==false){
    					  		byte[] decoded = Base64.decodeBase64(strAttachment); 
    				      			helper.addAttachment(strFileName, new ByteArrayResource(decoded));
    							logger.info("helpEncoding:" + helper.getEncoding());
    				     		}	 	
                          	     	
    					}
    				}
    				return helper.getMimeMessage() ;
    
    
    			}
    			catch(Exception e){
    				logger.info("error:" + e);
    				return message;
    			}
    			
    		}
    }

    I run it like this:
    Code:
    bash$: java -cp target/amqp-2.1.0.BUILD-SNAPSHOT.jar:lib/* -Djavax.net.ssl.trustStore=cacerts.jks org.springframework.integration.samples.amqp.Main;

    Cheers,

    Richard
    Last edited by ehrdoctors; Nov 12th, 2012 at 02:48 PM.

Tags for this Thread

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •