spring-client.xml:
Code:
!-- ********************** Consumer ********************************* -->
<bean id="clientListener" class="example.ClientReceive">
<property name="receiveQueue" ref="receiveQueue" />
<property name="sinkQueue" ref="sinkQueue" />
<property name="clientSender" ref="clientSender"/>
</bean>
<bean id="containerClient"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="20"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50,20 -->
<property name="maxConcurrentConsumers" value="50"/><!-- if it is not 1 - it is difficult to see distribution to other services!!!change to 50-->
<property name="idleTaskExecutionLimit" value="10"/>
<!-- <property name="maxMessagesPerTask" value="30"/> -->
<property name="connectionFactory" ref="singleConnectionFactory" />
<property name="destination" ref="receiveQueue" />
<property name="messageListener" ref="clientListener" />
<property name="sessionAcknowledgeMode" value="1"/>
</bean>
<bean id="receiveQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queueA"/>
</bean>
<bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="sinkQueue"/>
</bean>
<!-- ********************** END-Consumer ***************************** -->
<!-- ********************** Producer ********************************* -->
<bean id="clientSender" class="example.ClientSend">
<property name="sendQueue" ref="sendQueue" />
<property name="jmsTemplate" ref="jmsTemplate"/>
</bean>
<bean id="sendQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queueB"/>
</bean>
<!-- for send response on temp q-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" lazy-init="true">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
</bean>
<!-- ********************** END Producer ********************************* -->
<!-- ********************** Common ********************************* -->
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://localhost:61616)?randomize=false" /> <!-- Single broker-->
</bean>
<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="connectionFactory" />
<property name="reconnectOnException" value="true"/>
</bean>
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory">
<constructor-arg ref="connectionFactory"/>
<property name="maxConnections" value="10" />
</bean>
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="singleConnectionFactory"/>
<property name="sessionCacheSize" value="100" />
</bean>
<!-- <bean id="sinkQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="sinkQueue"/>
</bean> -->
</beans>
Client receive:
Code:
public class ClientReceive implements MessageListener{
Destination receiveQueue = null;
Destination sinkQueue = null;
ClientSend clientSender = null;
private AtomicInteger counter = new AtomicInteger(0);
public ClientSend getClientSender() {
return clientSender;
}
public void setClientSender(ClientSend clientSender) {
this.clientSender = clientSender;
}
public Destination getSinkQueue() {
return sinkQueue;
}
public void setSinkQueue(Destination sinkQueue) {
this.sinkQueue = sinkQueue;
}
public Destination getReceiveQueue() {
return receiveQueue;
}
public void setReceiveQueue(Destination receiveQueue) {
this.receiveQueue = receiveQueue;
}
public void start() throws JMSException {
clientSender.start(id, verbose);
}
public void onMessage(Message message) {
try {
message.acknowledge();//otherwise the messages get stuck, check why?????
LogicResponse response = (LogicResponse)(((ObjectMessage)message).getObject());
// send to sink notification
clientSender.sendRequest(null, sinkQueue);
}
catch (Exception e) {
e.printStackTrace();
}
}
public class SinkMessageCreator implements MessageCreator {
//message is of type javax.jms.Message
private Message message = null;
private Destination destination = null;
public SinkMessageCreator(Destination destination)
{
this.destination = destination;
}
public Message createMessage(Session session) throws JMSException
{
message = session.createTextMessage();
return message;
}
}
Client send:
Code:
public class ClientSend{
Destination sendQueue = null;
private AtomicInteger counter = new AtomicInteger(0);
JmsTemplate jmsTemplate = null;
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Destination getSendQueue() {
return sendQueue;
}
public void setSendQueue(Destination sendQueue) {
this.sendQueue = sendQueue;
}
public void start(String id, boolean verbose) throws JMSException {
this.id = id;
this.verbose = verbose;
}
public void sendRequest(LogicRequest request) throws JMSException {
sendRequest(request, sendQueue);
}
public void sendRequest(LogicRequest request, Destination queue) throws JMSException {
jmsTemplate.send(queue, new RequestMessageCreator(request));
}
public class RequestMessageCreator implements MessageCreator {
//message is of type javax.jms.Message
private Message message = null;
private Object request = null;
private String correlationId = null;
public RequestMessageCreator(Object request)
{
this.request = request;
}
public Message createMessage(Session session) throws JMSException
{
correlationId = UUID.randomUUID().toString();
if (this.request != null){
message = session.createObjectMessage((Serializable)this.request);
}
else{
message = session.createTextMessage();
}
message.setJMSCorrelationID(correlationId);
return message;
}
public Object getRequest() {
return request;
}
public void setRequest(Object request) {
this.request = request;
}
}