Page 1 of 2 12 LastLast
Results 1 to 10 of 17

Thread: Jmstemplate is a bottle neck in performance

  1. #1
    Join Date
    Jul 2012
    Posts
    27

    Default Jmstemplate is a bottle neck in performance

    Hi, i am comparing the performance of active mq using spring and with out spring.
    it seems that with spring is much slower.
    It looks like the problem is with sending and not receiving. When i use jmstemplate it is much slower.
    any ideas why?
    how can i improve performance of jms template?

    please note that i tested these 2 configurations:

    Code:
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
             <property name="connectionFactory" ref="pooledConnectionFactory"/>  
         </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
             <property name="connectionFactory" ref="cachingConnectionFactory"/>  
         </bean>
    thanks
    Last edited by michals; Dec 5th, 2012 at 05:15 AM.

  2. #2
    Join Date
    Jun 2006
    Location
    The Netherlands
    Posts
    13,695

    Default

    Post your tests... My guess is that you are comparing apples and oranges...

    Also when posting code use [ code][/code ] tags that way it remains readable.
    Marten Deinum
    Java Consultant / Pragmatist / Open Source Enthousiast / Author


    Pro Spring MVC: With Web Flow
    Conspect

    Have you read the reference guide.
    Use the [ code ] tags, young padawan

  3. #3
    Join Date
    Jul 2012
    Posts
    27

    Default

    I can't post my entire test, i don't see where i can attach files.

    I am testing this flow:
    1.Client->send request ->Server.
    2. Server->send response->Client.
    3. Client->receive response&send message to some measurement client
    4. Measurement Client->receive message - after receiving all messages it will stop the timer.

    This full flow is tested for 300k messages.
    and it takes for Activemq with no spring half the time as it does for with spring.

    Here is the spring xml i am using.
    Code:
    <!-- ********************** Consumer ********************************* --> 
    	<bean id="clientListener" class="example.ClientReceive">
    		<property name="receiveQueue" ref="receiveQueue" />
    		<property name="sinkQueue" ref="sinkQueue" />
    		<property name="clientSender" ref="clientSender"/> 
    	</bean>
    
    	<bean id="containerClient"
    		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="concurrentConsumers" value="20"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50,20 -->
                <property name="maxConcurrentConsumers" value="50"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50-->
                <property name="idleTaskExecutionLimit" value="10"/>
                <!-- <property name="maxMessagesPerTask" value="30"/> -->
    		<property name="connectionFactory" ref="singleConnectionFactory" />
    		<property name="destination" ref="receiveQueue" />
    		<property name="messageListener" ref="clientListener" />
    		<property name="sessionAcknowledgeMode" value="1"/>
    	</bean>
    	
    	
    	
    	<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queueA"/>
        </bean>
        <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="sinkQueue"/>
        </bean>
        
        <!-- **********************  END-Consumer *****************************	-->
        
    	
    	<!-- ********************** Producer ********************************* --> 
    	<bean id="clientSender" class="example.ClientSend">
    		 <property name="sendQueue" ref="sendQueue" /> 
    		 <property name="jmsTemplate" ref="jmsTemplate"/>  
    	</bean>
    	
    	 <bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queueB"/>
        </bean> 
        
         <!-- for send response on temp q-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
             <property name="connectionFactory" ref="cachingConnectionFactory"/>  
         </bean>
    	
    	<!-- ********************** END Producer ********************************* -->
     
        
        
        <!-- ********************** Common ********************************* -->
        
    	<bean id="connectionFactory" 
    		class="org.apache.activemq.ActiveMQConnectionFactory">
    		 <property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" />    <!-- Single broker-->
    		 <!-- <property name="brokerURL" value="tcp://localhost:61616" /> --> 
    		  <!-- <property name="brokerURL" value="failover:(tcp://localhost:61616?wireFormat.tightEncodingEnabled=false)?randomize=false" /> --><!-- for vertical scalability-->
    		 <!-- <property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://dev15:61616)?randomize=true"/> --> <!-- network of brokers -->
    		<!-- <property name="brokerURL" value="failover:(tcp://dev15:61616)?randomize=true"/> -->
    		<!-- <property name="brokerURL" value="vm://amq3?brokerConfig=xbean:amq3.xml" /> --><!-- embedded broker for network of brokers -->
    		<!-- <property name="brokerURL" value="failover:(tcp://0.0.0.0:61618,tcp://0.0.0.0:61617)" />  --><!-- service broker for network of brokers : connect to amq3-->
    		<!-- <property name="brokerURL" value="failover:(tcp://0.0.0.0:61618)" /> --> <!-- connection amq3 -->
    	</bean>
    	
        <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        	 <property name="targetConnectionFactory"
        		ref="connectionFactory" />
        		<property name="reconnectOnException" value="true"/>
        </bean>
        
         <bean id="pooledConnectionFactory"  
    		class="org.apache.activemq.pool.PooledConnectionFactory">
    		<constructor-arg ref="connectionFactory"/>
    		<property name="maxConnections" value="10" /> 
    	</bean>    
    	
    	  <bean id="cachingConnectionFactory"  
    		class="org.springframework.jms.connection.CachingConnectionFactory">
    		<constructor-arg ref="singleConnectionFactory"/>
    		<property name="sessionCacheSize" value="100" /> 
    	</bean>   
    	
        <!-- <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="sinkQueue"/>
        </bean> -->
    	
         
    </beans>
    I am not sure what you mean by "comparing apples and oranges"... i am testing the same code, once with spring and once with
    creating my own activemq objects and using

  4. #4
    Join Date
    Jun 2006
    Location
    The Netherlands
    Posts
    13,695

    Default

    I am not sure what you mean by "comparing apples and oranges"... i am testing the same code, once with spring and once with
    creating my own activemq objects and using
    Which is exactly what I mean by comparing apples and oranges...

    Hence my request for the code you wrote/use (inlcuding the configuration)...
    Marten Deinum
    Java Consultant / Pragmatist / Open Source Enthousiast / Author


    Pro Spring MVC: With Web Flow
    Conspect

    Have you read the reference guide.
    Use the [ code ] tags, young padawan

  5. #5
    Join Date
    Jul 2012
    Posts
    27

    Default

    Is there a way to attach files other than images?

  6. #6
    Join Date
    Jun 2006
    Location
    The Netherlands
    Posts
    13,695

    Default

    By simply attaching them (you might need to go to advanced mode for that)... If the code isn't that large you could copy/paste in the post using [ code][/code ] tags to maintain formatting.
    Marten Deinum
    Java Consultant / Pragmatist / Open Source Enthousiast / Author


    Pro Spring MVC: With Web Flow
    Conspect

    Have you read the reference guide.
    Use the [ code ] tags, young padawan

  7. #7
    Join Date
    Jul 2012
    Posts
    27

    Default

    This is a big example and for some reason when i try to attach the files - it indicates that the file is bad.
    I will add some of the code here: which shows how the client is implemented. the server is basically the same
    so i didn't add it.
    thanks

    Code:
    spring-client.xml:
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:jms="http://www.springframework.org/schema/jms"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans 
    	       	http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    	       	http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
           		http://www.springframework.org/schema/context
           		http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    
    
    	<!-- ********************** Consumer ********************************* --> 
    	<bean id="clientListener" class="example.ClientReceive">
    		<property name="receiveQueue" ref="receiveQueue" />
    		<property name="sinkQueue" ref="sinkQueue" />
    		<property name="clientSender" ref="clientSender"/> 
    	</bean>
    
    	<bean id="containerClient"
    		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="concurrentConsumers" value="20"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50,20 -->
                <property name="maxConcurrentConsumers" value="50"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50-->
                <property name="idleTaskExecutionLimit" value="10"/>
                <!-- <property name="maxMessagesPerTask" value="30"/> -->
    		<property name="connectionFactory" ref="singleConnectionFactory" />
    		<property name="destination" ref="receiveQueue" />
    		<property name="messageListener" ref="clientListener" />
    		<property name="sessionAcknowledgeMode" value="1"/>
    	</bean>
    	
    	
    	
    	<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queueA"/>
        </bean>
        <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="sinkQueue"/>
        </bean>
        
        <!-- **********************  END-Consumer *****************************	-->
        
    	
    	<!-- ********************** Producer ********************************* --> 
    	<bean id="clientSender" class="example.ClientSend">
    		 <property name="sendQueue" ref="sendQueue" /> 
    		 <property name="jmsTemplate" ref="jmsTemplate"/>  
    	</bean>
    	
    	 <bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queueB"/>
        </bean> 
        
         <!-- for send response on temp q-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
             <property name="connectionFactory" ref="cachingConnectionFactory"/>  
         </bean>
    	
    	<!-- ********************** END Producer ********************************* -->
     
        
        
        <!-- ********************** Common ********************************* -->
        
    	<bean id="connectionFactory" 
    		class="org.apache.activemq.ActiveMQConnectionFactory">
    		 <property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" />    <!-- Single broker-->
    		 <!-- <property name="brokerURL" value="tcp://localhost:61616" /> --> 
    		  <!-- <property name="brokerURL" value="failover:(tcp://localhost:61616?wireFormat.tightEncodingEnabled=false)?randomize=false" /> --><!-- for vertical scalability-->
    		 <!-- <property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://dev15:61616)?randomize=true"/> --> <!-- network of brokers -->
    		<!-- <property name="brokerURL" value="failover:(tcp://dev15:61616)?randomize=true"/> -->
    		<!-- <property name="brokerURL" value="vm://amq3?brokerConfig=xbean:amq3.xml" /> --><!-- embedded broker for network of brokers -->
    		<!-- <property name="brokerURL" value="failover:(tcp://0.0.0.0:61618,tcp://0.0.0.0:61617)" />  --><!-- service broker for network of brokers : connect to amq3-->
    		<!-- <property name="brokerURL" value="failover:(tcp://0.0.0.0:61618)" /> --> <!-- connection amq3 -->
    	</bean>
    	
        <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        	 <property name="targetConnectionFactory"
        		ref="connectionFactory" />
        		<property name="reconnectOnException" value="true"/>
        </bean>
        
         <bean id="pooledConnectionFactory"  
    		class="org.apache.activemq.pool.PooledConnectionFactory">
    		<constructor-arg ref="connectionFactory"/>
    		<property name="maxConnections" value="10" /> 
    	</bean>    
    	
    	  <bean id="cachingConnectionFactory"  
    		class="org.springframework.jms.connection.CachingConnectionFactory">
    		<constructor-arg ref="singleConnectionFactory"/>
    		<property name="sessionCacheSize" value="100" /> 
    	</bean>   
    	
        <!-- <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="sinkQueue"/>
        </bean> -->
    	
         
    </beans>
    
    public class ClientReceive implements MessageListener{
    	Connection connection = null;
    	Destination receiveQueue = null;
    	Destination sinkQueue = null;
    	ClientSend clientSender = null;
    	private AtomicInteger counter = new AtomicInteger(0);
    	private String id = null;
    	private boolean verbose = false; 
    	
    	
    	
    	public ClientSend getClientSender() {
    		return clientSender;
    	}
    	public void setClientSender(ClientSend clientSender) {
    		this.clientSender = clientSender;
    	}
    	public Destination getSinkQueue() {
    		return sinkQueue;
    	}
    	public void setSinkQueue(Destination sinkQueue) {
    		this.sinkQueue = sinkQueue;
    	}
    	
    	public Destination getReceiveQueue() {
    		return receiveQueue;
    	}
    	public void setReceiveQueue(Destination receiveQueue) {
    		this.receiveQueue = receiveQueue;
    	}
    	public void start(String id, boolean verbose) throws JMSException {
    		this.id = id;
    		this.verbose = verbose;
    		clientSender.start(id, verbose); 
    	}
    	public void onMessage(Message message) { 
    		try {
    			message.acknowledge();//otherwise the messages get stuck, check why?????
    
    			LogicResponse response = (LogicResponse)(((ObjectMessage)message).getObject());
    			
    			// send to sink notification
    			clientSender.sendRequest(null, sinkQueue);
    
    			if (this.verbose == true){
    				System.out.println("Received response: " + response.getData());
    			}
    			
    		} 
    		catch (Exception e) {
    				e.printStackTrace();
    		}
    	}
    	public class SinkMessageCreator implements MessageCreator {
    
        	//message is of type javax.jms.Message
        	private Message message = null;
        	private Destination destination = null;
        	
        	public SinkMessageCreator(Destination destination)
        	{
        		this.destination = destination;
        	}
        	
     public Message createMessage(Session session) throws JMSException 
        	{
        		message = session.createTextMessage();
        		return message;
        	}
    	}
    
    
    public class ClientSend{
    	Connection connection = null;
    	Session session = null;
    	String brokerUrl = "tcp://localhost:61616";
    	MessageProducer producer = null;
    	Destination sendQueue = null;
        private AtomicInteger counter = new AtomicInteger(0);
    
    	long end = 0;
    	long start = 0;
    	private String id = null;
    	private boolean verbose = false; 
    	
    	JmsTemplate jmsTemplate = null;
    	
    	public JmsTemplate getJmsTemplate() {
    		return jmsTemplate;
    	}
    	public void setJmsTemplate(JmsTemplate jmsTemplate) {
    		this.jmsTemplate = jmsTemplate;
    	}
    	public Destination getSendQueue() {
    		return sendQueue;
    	}
    	public void setSendQueue(Destination sendQueue) {
    		this.sendQueue = sendQueue;
    	}
    	
    	public void start(String id, boolean verbose) throws JMSException {
    		this.id = id;
    		this.verbose = verbose;
    	}
    
    	public void sendRequest(LogicRequest request) throws JMSException {
    		sendRequest(request, sendQueue);
    	}
    	public void sendRequest(LogicRequest request, Destination queue) throws JMSException {
    		jmsTemplate.send(queue, new RequestMessageCreator(request));
    
    	}
    	
    	public class RequestMessageCreator implements MessageCreator {
    
        	//message is of type javax.jms.Message
        	private Message message = null;
        	private Object request = null;
        	private String correlationId = null;
        	
        	public RequestMessageCreator(Object request)
        	{
        		this.request = request;
        	}
        	
        	public Message createMessage(Session session) throws JMSException 
        	{
        		correlationId = UUID.randomUUID().toString(); 
        		 
        		if (this.request != null){
        			message = session.createObjectMessage((Serializable)this.request);
        		}
        		else{
        			message = session.createTextMessage();
        		}
        		message.setJMSCorrelationID(correlationId);
        		return message;
        	}
    
    		public Object getRequest() {
    			return request;
    		}
    
    		public void setRequest(Object request) {
    			this.request = request;
    		}
    
        }

  8. #8
    Join Date
    Jul 2012
    Posts
    27

    Default

    spring-client.xml:
    Code:
    !-- ********************** Consumer ********************************* --> 
    	<bean id="clientListener" class="example.ClientReceive">
    		<property name="receiveQueue" ref="receiveQueue" />
    		<property name="sinkQueue" ref="sinkQueue" />
    		<property name="clientSender" ref="clientSender"/> 
    	</bean>
    
    	<bean id="containerClient"
    		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    		<property name="concurrentConsumers" value="20"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50,20 -->
                <property name="maxConcurrentConsumers" value="50"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50-->
                <property name="idleTaskExecutionLimit" value="10"/>
                <!-- <property name="maxMessagesPerTask" value="30"/> -->
    		<property name="connectionFactory" ref="singleConnectionFactory" />
    		<property name="destination" ref="receiveQueue" />
    		<property name="messageListener" ref="clientListener" />
    		<property name="sessionAcknowledgeMode" value="1"/>
    	</bean>
    	
    	<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queueA"/>
        </bean>
        <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="sinkQueue"/>
        </bean>
        
        <!-- **********************  END-Consumer *****************************	-->
        
    	<!-- ********************** Producer ********************************* --> 
    	<bean id="clientSender" class="example.ClientSend">
    		 <property name="sendQueue" ref="sendQueue" /> 
    		 <property name="jmsTemplate" ref="jmsTemplate"/>  
    	</bean>
    	
    	 <bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queueB"/>
        </bean> 
        
         <!-- for send response on temp q-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
             <property name="connectionFactory" ref="cachingConnectionFactory"/>  
         </bean>
    	
    	<!-- ********************** END Producer ********************************* -->
        
        <!-- ********************** Common ********************************* -->
        
    	<bean id="connectionFactory" 
    		class="org.apache.activemq.ActiveMQConnectionFactory">
    		 <property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" />    <!-- Single broker-->
    	</bean>
    	
        <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        	 <property name="targetConnectionFactory"
        		ref="connectionFactory" />
        		<property name="reconnectOnException" value="true"/>
        </bean>
        
         <bean id="pooledConnectionFactory"  
    		class="org.apache.activemq.pool.PooledConnectionFactory">
    		<constructor-arg ref="connectionFactory"/>
    		<property name="maxConnections" value="10" /> 
    	</bean>    
    	
    	  <bean id="cachingConnectionFactory"  
    		class="org.springframework.jms.connection.CachingConnectionFactory">
    		<constructor-arg ref="singleConnectionFactory"/>
    		<property name="sessionCacheSize" value="100" /> 
    	</bean>   
    	
        <!-- <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="sinkQueue"/>
        </bean> -->
    	
         
    </beans>
    Client receive:
    Code:
    public class ClientReceive implements MessageListener{
    	Destination receiveQueue = null;
    	Destination sinkQueue = null;
    	ClientSend clientSender = null;
    	private AtomicInteger counter = new AtomicInteger(0);
    			
    	public ClientSend getClientSender() {
    		return clientSender;
    	}
    	public void setClientSender(ClientSend clientSender) {
    		this.clientSender = clientSender;
    	}
    	public Destination getSinkQueue() {
    		return sinkQueue;
    	}
    	public void setSinkQueue(Destination sinkQueue) {
    		this.sinkQueue = sinkQueue;
    	}
    	
    	public Destination getReceiveQueue() {
    		return receiveQueue;
    	}
    	public void setReceiveQueue(Destination receiveQueue) {
    		this.receiveQueue = receiveQueue;
    	}
    	public void start() throws JMSException {
    		clientSender.start(id, verbose); 
    	}
    	public void onMessage(Message message) { 
    		try {
    			message.acknowledge();//otherwise the messages get stuck, check why?????
    
    			LogicResponse response = (LogicResponse)(((ObjectMessage)message).getObject());
    			
    			// send to sink notification
    			clientSender.sendRequest(null, sinkQueue);
    
    		} 
    		catch (Exception e) {
    				e.printStackTrace();
    		}
    	}
    	public class SinkMessageCreator implements MessageCreator {
    
        	//message is of type javax.jms.Message
        	private Message message = null;
        	private Destination destination = null;
        	
        	public SinkMessageCreator(Destination destination)
        	{
        		this.destination = destination;
        	}
        	
     public Message createMessage(Session session) throws JMSException 
        	{
        		message = session.createTextMessage();
        		return message;
        	}
    	}
    Client send:
    Code:
    public class ClientSend{
    	Destination sendQueue = null;
        private AtomicInteger counter = new AtomicInteger(0);
    
    	JmsTemplate jmsTemplate = null;
    	
    	public JmsTemplate getJmsTemplate() {
    		return jmsTemplate;
    	}
    	public void setJmsTemplate(JmsTemplate jmsTemplate) {
    		this.jmsTemplate = jmsTemplate;
    	}
    	public Destination getSendQueue() {
    		return sendQueue;
    	}
    	public void setSendQueue(Destination sendQueue) {
    		this.sendQueue = sendQueue;
    	}
    	
    	public void start(String id, boolean verbose) throws JMSException {
    		this.id = id;
    		this.verbose = verbose;
    	}
    
    	public void sendRequest(LogicRequest request) throws JMSException {
    		sendRequest(request, sendQueue);
    	}
    	public void sendRequest(LogicRequest request, Destination queue) throws JMSException {
    		jmsTemplate.send(queue, new RequestMessageCreator(request));
    
    	}
    	
    	public class RequestMessageCreator implements MessageCreator {
    
        	//message is of type javax.jms.Message
        	private Message message = null;
        	private Object request = null;
        	private String correlationId = null;
        	
        	public RequestMessageCreator(Object request)
        	{
        		this.request = request;
        	}
        	
        	public Message createMessage(Session session) throws JMSException 
        	{
        		correlationId = UUID.randomUUID().toString(); 
        		 
        		if (this.request != null){
        			message = session.createObjectMessage((Serializable)this.request);
        		}
        		else{
        			message = session.createTextMessage();
        		}
        		message.setJMSCorrelationID(correlationId);
        		return message;
        	}
    
    		public Object getRequest() {
    			return request;
    		}
    
    		public void setRequest(Object request) {
    			this.request = request;
    		}
    
        }
    Last edited by michals; Dec 6th, 2012 at 02:22 AM.

  9. #9
    Join Date
    Jun 2006
    Location
    The Netherlands
    Posts
    13,695

    Default

    It must be me but I only see Spring stuff no plain JMS stuff. Also could you seperate the classes in different code blocks one big gulp is n't really readable.
    Marten Deinum
    Java Consultant / Pragmatist / Open Source Enthousiast / Author


    Pro Spring MVC: With Web Flow
    Conspect

    Have you read the reference guide.
    Use the [ code ] tags, young padawan

  10. #10
    Join Date
    Jul 2012
    Posts
    27

    Default

    oh, i just added the spring test. i will also add the test which does not use spring.
    I also fixed the previous post to be mode readable.

    Here is the code for just plain AMQ:

    Server:
    Code:
    public class Server implements MessageListener {
    	BrokerService broker = null;
    	String brokerUrl = new String("failover:(tcp://localhost:61616)?randomize=false");//failover:(tcp://localhost:61616)
    	Session session;
    	MessageProducer producer = null;
    	MessageConsumer consumer = null;
    	private AtomicInteger counter = new AtomicInteger(0);
    	ActiveMQConnectionFactory connectionFactory = null;
    	Connection connection = null;
    	Destination tasksQueue = null;
    	
    	public void start() throws Exception {
    //		createBroker(); 
    		setupConsumer();
    	}
    	
    	private void createBroker() throws Exception { 
    		broker = new BrokerService(); 
    		broker.setPersistent(false); 
    		broker.setUseJmx(false); 
    		broker.addConnector(brokerUrl); 
    		broker.start();
    	}
    	private void setupConsumer() throws JMSException {
    		connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
    		connection = connectionFactory.createConnection(); 
    		connection.start(); 
    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    		tasksQueue = session.createQueue(JmsConfiguration.AGENT_QUEUE_NAME);
    		producer = session.createProducer(null); 
    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    		consumer = session.createConsumer(tasksQueue);
    		
    		consumer.setMessageListener(this); 
    	}
    	public void onMessage(Message message) {
    	try {
    		LogicResponse response = null;
    			if (message instanceof ObjectMessage) {
    				LogicRequest request = (LogicRequest)(((ObjectMessage)message).getObject());
    				String param = (String)request.getParam("test");
    				response = new LogicResponse(handleRequest(param));
    			}
    			ObjectMessage responseObj = session.createObjectMessage(response);
    			responseObj.setJMSCorrelationID(message.getJMSCorrelationID());
    			producer.send(message.getJMSReplyTo(), responseObj); 
    		} 
    		catch (JMSException e) {
    			e.printStackTrace(); 
    		}
    	}
    	
    	public String handleRequest(String messageText) { 
    		return "Response to '" + messageText + "'";
    	} 
    	public void stop() throws Exception { 
    		if (producer != null){
    			producer.close();
    		}
    
    		if (consumer != null){
    			consumer.setMessageListener(null);
    			consumer.close();
    			consumer = null;
    		}
    		if (session != null){
    			session.close(); 
    		}
    		if (broker != null){
    			broker.stop();
    		}
    		if (connection != null){
    			connection.stop();
    		}
    		
    	
    	}
    Client:
    Code:
    public class Client implements MessageListener{
    	Connection connection = null;
    	Session session = null;
    	MessageProducer producer = null;
    	MessageProducer sinkProducer = null;
    	MessageConsumer consumer = null;
    	Destination tasksQueue = null;
    	Destination sinkQueue = null;
    	Destination responseQueue = null;
    	String jmsBrokerIp = ManagerConsts.JMS_BROKER_IP;//System.getProperty(ManagerConsts.JMS_BROKER_IP);
        String jmsBrokerPort = ManagerConsts.JMS_BROKER_PORT;//System.getProperty(ManagerConsts.JMS_BROKER_PORT);
        String jmsBrokerUrl = new String("failover:(tcp://" + jmsBrokerIp + ":" + jmsBrokerPort+")?randomize=false");//failover:(tcp://localhost:61616)
        private AtomicInteger count = new AtomicInteger(0);
    	long end = 0;
    	long start = 0;
    	
    	public void start() throws JMSException {
    		ActiveMQConnectionFactory connectionFactory = 
    		new ActiveMQConnectionFactory(jmsBrokerUrl);
    		connection = connectionFactory.createConnection(); 
    		connection.start(); 
    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    	
    		tasksQueue = session.createQueue(JmsConfiguration.AGENT_QUEUE_NAME);
    		producer = session.createProducer(tasksQueue); 
    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    		responseQueue = session.createQueue(JmsConfiguration.AGENT_DEST_QUEUE_NAME);
    		consumer = session.createConsumer(responseQueue);
    		consumer.setMessageListener(this);
    		
    		sinkQueue = session.createQueue(JmsConfiguration.SINK_QUEUE);
    		sinkProducer = session.createProducer(sinkQueue); 
    		sinkProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    		
    	}
    	public void request(LogicRequest request) throws JMSException {
    		ObjectMessage message = session.createObjectMessage(request);
    		message.setJMSReplyTo(responseQueue);
    		String correlationId = UUID.randomUUID().toString(); 
    		message.setJMSCorrelationID(correlationId); 
    		this.producer.send(message);
    	}
    	
    	public void onMessage(Message message) { 
    		try {
    			int countCurr = count.incrementAndGet();
    			
    			TextMessage stam = session.createTextMessage();
    			this.sinkProducer.send(stam);
    
    		} 
    		catch (Exception e) {
    				e.printStackTrace();
    		}
    	}
    	public void stop() throws JMSException { 
    		producer.close(); 
    		consumer.close(); 
    		session.close(); 
    		connection.close();
    	}
    Sink:
    Code:
    public class Sink implements MessageListener{
    	Connection connection = null;
    	Session session = null;
    //	String brokerUrl = "tcp://localhost:61616";
    	MessageConsumer consumer = null;
    	
    	Destination sinkQueue = null;
    	String jmsBrokerIp = ManagerConsts.JMS_BROKER_IP;//System.getProperty(ManagerConsts.JMS_BROKER_IP);
        String jmsBrokerPort = ManagerConsts.JMS_BROKER_PORT;//System.getProperty(ManagerConsts.JMS_BROKER_PORT);
        String jmsBrokerUrl = new String("failover:(tcp://" + jmsBrokerIp + ":" + jmsBrokerPort+")?randomize=false");//failover:(tcp://localhost:61616)
        private AtomicInteger count = new AtomicInteger(0);
    	long end = 0;
    	long start = 0;
    	
    	public void start() throws JMSException {
    		ActiveMQConnectionFactory connectionFactory = 
    		new ActiveMQConnectionFactory(jmsBrokerUrl);
    		connection = connectionFactory.createConnection(); 
    		connection.start(); 
    		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    		sinkQueue = session.createQueue(JmsConfiguration.SINK_QUEUE);
    		consumer = session.createConsumer(sinkQueue);
    		consumer.setMessageListener(this);
    	}
    
    	public void onMessage(Message message) { 
    		try {
    //			message.acknowledge();
    			int countCurr = count.incrementAndGet();
    			
    			if (countCurr == 1){
    				System.out.println("started timing ...");
    				start = System.currentTimeMillis();// ////////////////TEST
    			}
    
    			else if (countCurr == (RaiseClient.COUNT_MESSAGES*RaiseClient.CONTROLLERS_COUNT)){
    			
    				end = System.currentTimeMillis();// ////////////////TEST
    				printDebugTime(start, end, "TestSink:"+countCurr);
    				stop();
    				System.exit(1);
    			}
    			System.out.print(countCurr+".");
    			System.out.flush();
    
    		} 
    		catch (Exception e) {
    				e.printStackTrace();
    		}
    	}
    	public void stop() throws JMSException { 
    		consumer.close(); 
    		session.close(); 
    		connection.close();
    	} 
    		
    }
    Last edited by michals; Dec 6th, 2012 at 02:23 AM.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •