Hi Robbie, Thanks for your super helpful reply! I'll go with a fixed reply queue and a dedicated thread for receiving replies.
Again, thanks a lot! Yann On Thu, Nov 18, 2021 at 2:42 PM Robbie Gemmell <robbie.gemm...@gmail.com> wrote: > Setting a MessageListener on a consumer dedicates the Session to its > asynchronous message delivery thread of control [while the connection > is started]. So sending with one thread and also having listener(s) > being delivered consumed messages on the same session is still > multi-threading the session. > > Using a synchronous receive(..) calls to consume replies, instead of a > MessageListener, might be more appropriate than setting a listener if > you want/need to use a single session for send+recieve. You would then > only have one application thread using the session. Alternatively, > MessageListener callbacks can also send. > > The comments are suggesting that you can use the same queue for > multiple responses [concurrently] by mapping the arriving response to > a particular request through use of a JMSCorrelationID value set on > the message, which you included on the request and that the responder > then sets on its reply back so that you can correlate them. A reply-to > queue does not need to be a TemporaryQueue either way, you can also > use fixed queues, and often you might need to depending on your > reliability needs for the responses etc. > > > On Wed, 17 Nov 2021 at 18:25, Yann Massard <yam...@gmail.com> wrote: > > > > Hi, > > > > working with Artemis, I am trying to understand how to reuse a session > > and corresponding producer and consumer in a request-reply scenario. I > > have started with the RequestReplyExample and tried to make it work with > > multiple requests/responses. After all, the example's comments say: > > > > "Of course, in a real world example you would re-use the session, > > producer, consumer and temporary queue and not create a new one for each > > message!" > > > > Since I know that sessions should not be used by multiple threads, I am > > making sure, all messages are sent through the same thread. > > > > (My code is at the very bottom of this message.) > > > > However, I still get exceptions: > > > > Nov 17, 2021 6:42:58 PM > > org.apache.activemq.artemis.core.client.impl.ClientSessionImpl startCall > > WARN: AMQ212051: Invalid concurrent session usage. Sessions are not > > supposed to be used by more than one thread concurrently. > > java.lang.Exception: trace > > at > > > org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.startCall(ClientSessionImpl.java:1587) > > at > > > org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.acknowledge(ClientSessionImpl.java:1209) > > at > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doAck(ClientConsumerImpl.java:1117) > > at > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.acknowledge(ClientConsumerImpl.java:788) > > at > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:136) > > at > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:38) > > at > > > org.apache.activemq.artemis.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:136) > > at > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013) > > at > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133) > > at > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) > > at > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) > > at > > > org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65) > > at > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > at > > > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) > > > > > > As soon as I remove the client's MessageListener, the exceptions are > > gone. So I assume that sending the ACK (see stack trace) is done by > > another thread and that might be the problem. However, I don't know how > > to change this. > > > > Can anybody give me a hint how to procede? > > > > Btw. the comment also says: > > > > "Or better still use the correlation id, and just store the requests in > > a map, then you don't need a temporary queue at all" > > > > I am very interested but have no idea how this is supposed to work. > > Which queue should the responses be sent through? > > > > Any help is greatly appreciated! > > > > Thank you! > > > > Yann > > > > > > package org.apache.activemq.artemis.jms.example; > > > > import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; > > > > import javax.jms.*; > > import java.util.concurrent.Executor; > > import java.util.concurrent.Executors; > > > > public class RequestReplyExample { > > > > public static void main(final String[] args)throws Exception { > > new Server().start(); > > Client client =new Client(); > > for (int i =0; i <100; i++) { > > client.sendJob("message-" + i); > > } > > } > > } > > > > class Client { > > > > private final Executor executor =Executors.newSingleThreadExecutor(); > > private final Session session; > > private final TemporaryQueue replyQueue; > > private final MessageProducer producer; > > > > public Client()throws JMSException { > > var cf =new > ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1"); > > Connection connection =cf.createConnection(); > > connection.start(); > > session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE); > > > > Queue requestQueue =session.createQueue("queue1"); > > producer =session.createProducer(requestQueue); > > > > replyQueue =session.createTemporaryQueue(); > > MessageConsumer replyConsumer =session.createConsumer(replyQueue); > > replyConsumer.setMessageListener(replyMessage -> { > > try { > > TextMessage reply = (TextMessage) replyMessage; > > System.out.println("Got reply: " +reply.getText()); > > // use correlationId to correlate with request... > > }catch (JMSException e) { > > e.printStackTrace(); > > } > > }); > > } > > > > public void sendJob(String json) { > > executor.execute(() -> {// make sure the session is only used by > a single thread! try { > > TextMessage msg =session.createTextMessage(json); > > msg.setJMSReplyTo(replyQueue); > > System.out.println("Sending message: " +json +" with > replyTo: " +replyQueue); > > producer.send(msg); > > }catch (JMSException e) { > > e.printStackTrace(); > > } > > }); > > } > > > > } > > > > class Server { > > private Connection connection; > > > > public void start()throws Exception { > > ActiveMQConnectionFactory cf =new > ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1"); > > connection =cf.createConnection(); > > > > connection.start(); > > Session session > =connection.createSession(false,Session.AUTO_ACKNOWLEDGE); > > Queue requestQueue =session.createQueue("queue1"); > > MessageProducer replyProducer =session.createProducer(null); > > MessageConsumer requestConsumer > =session.createConsumer(requestQueue); > > requestConsumer.setMessageListener(request -> { > > try { > > System.out.println("Received request message: " + > ((TextMessage) request).getText()); > > Destination replyDestination = request.getJMSReplyTo(); > > System.out.println("Reply to queue: " +replyDestination); > > TextMessage replyMessage =session.createTextMessage("A > reply message"); > > replyMessage.setJMSCorrelationID(request.getJMSMessageID()); > > replyProducer.send(replyDestination,replyMessage); > > System.out.println("Reply sent"); > > }catch (JMSException e) { > > e.printStackTrace(); > > } > > }); > > } > > > > public void shutdown()throws JMSException { > > connection.close(); > > } > > } > > > > >