Hi Robbie,

Thanks for your super helpful reply! I'll go with a fixed reply queue and a
dedicated thread for receiving replies.

Again, thanks a lot!
Yann


On Thu, Nov 18, 2021 at 2:42 PM Robbie Gemmell <robbie.gemm...@gmail.com>
wrote:

> Setting a MessageListener on a consumer dedicates the Session to its
> asynchronous message delivery thread of control [while the connection
> is started]. So sending with one thread and also having listener(s)
> being delivered consumed messages on the same session is still
> multi-threading the session.
>
> Using a synchronous receive(..) calls to consume replies, instead of a
> MessageListener, might be more appropriate than setting a listener if
> you want/need to use a single session for send+recieve. You would then
> only have one application thread using the session. Alternatively,
> MessageListener callbacks can also send.
>
> The comments are suggesting that you can use the same queue for
> multiple responses [concurrently] by mapping the arriving response to
> a particular request through use of a JMSCorrelationID value set on
> the message, which you included on the request and that the responder
> then sets on its reply back so that you can correlate them. A reply-to
> queue does not need to be a TemporaryQueue either way, you can also
> use fixed queues, and often you might need to depending on your
> reliability needs for the responses etc.
>
>
> On Wed, 17 Nov 2021 at 18:25, Yann Massard <yam...@gmail.com> wrote:
> >
> > Hi,
> >
> > working with Artemis, I am trying to understand how to reuse a session
> > and corresponding producer and consumer in a request-reply scenario. I
> > have started with the RequestReplyExample and tried to make it work with
> > multiple requests/responses. After all, the example's comments say:
> >
> > "Of course, in a real world example you would re-use the session,
> > producer, consumer and temporary queue and not create a new one for each
> > message!"
> >
> > Since I know that sessions should not be used by multiple threads, I am
> > making sure, all messages are sent through the same thread.
> >
> > (My code is at the very bottom of this message.)
> >
> > However, I still get exceptions:
> >
> > Nov 17, 2021 6:42:58 PM
> > org.apache.activemq.artemis.core.client.impl.ClientSessionImpl startCall
> > WARN: AMQ212051: Invalid concurrent session usage. Sessions are not
> > supposed to be used by more than one thread concurrently.
> > java.lang.Exception: trace
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.startCall(ClientSessionImpl.java:1587)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.acknowledge(ClientSessionImpl.java:1209)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doAck(ClientConsumerImpl.java:1117)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.acknowledge(ClientConsumerImpl.java:788)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:136)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:38)
> >      at
> >
> org.apache.activemq.artemis.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:136)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> >      at
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> >      at
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> >      at
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> >      at
> >
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> >      at
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> >      at
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> >      at
> >
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> >
> >
> > As soon as I remove the client's MessageListener, the exceptions are
> > gone. So I assume that sending the ACK (see stack trace) is done by
> > another thread and that might be the problem. However, I don't know how
> > to change this.
> >
> > Can anybody give me a hint how to procede?
> >
> > Btw. the comment also says:
> >
> > "Or better still use the correlation id, and just store the requests in
> > a map, then you don't need a temporary queue at all"
> >
> > I am very interested but have no idea how this is supposed to work.
> > Which queue should the responses be sent through?
> >
> > Any help is greatly appreciated!
> >
> > Thank you!
> >
> > Yann
> >
> >
> > package org.apache.activemq.artemis.jms.example;
> >
> > import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
> >
> > import javax.jms.*;
> > import java.util.concurrent.Executor;
> > import java.util.concurrent.Executors;
> >
> > public class RequestReplyExample {
> >
> >     public static void main(final String[] args)throws Exception {
> >        new Server().start();
> >        Client client =new Client();
> >        for (int i =0; i <100; i++) {
> >           client.sendJob("message-" + i);
> >        }
> >     }
> > }
> >
> > class Client {
> >
> >     private final Executor executor =Executors.newSingleThreadExecutor();
> >     private final Session session;
> >     private final TemporaryQueue replyQueue;
> >     private final MessageProducer producer;
> >
> >     public Client()throws JMSException {
> >        var cf =new
> ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
> >        Connection connection =cf.createConnection();
> >        connection.start();
> >        session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
> >
> >        Queue requestQueue =session.createQueue("queue1");
> >        producer =session.createProducer(requestQueue);
> >
> >        replyQueue =session.createTemporaryQueue();
> >        MessageConsumer replyConsumer =session.createConsumer(replyQueue);
> >        replyConsumer.setMessageListener(replyMessage -> {
> >           try {
> >              TextMessage reply = (TextMessage) replyMessage;
> >              System.out.println("Got reply: " +reply.getText());
> >              // use correlationId to correlate with request...
> >           }catch (JMSException e) {
> >              e.printStackTrace();
> >           }
> >        });
> >     }
> >
> >     public void sendJob(String json) {
> >        executor.execute(() -> {// make sure the session is only used by
> a single thread! try {
> >              TextMessage msg =session.createTextMessage(json);
> >              msg.setJMSReplyTo(replyQueue);
> >              System.out.println("Sending message: " +json +" with
> replyTo: " +replyQueue);
> >              producer.send(msg);
> >           }catch (JMSException e) {
> >              e.printStackTrace();
> >           }
> >        });
> >     }
> >
> > }
> >
> > class Server {
> >     private Connection connection;
> >
> >     public void start()throws Exception {
> >        ActiveMQConnectionFactory cf =new
> ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
> >        connection =cf.createConnection();
> >
> >        connection.start();
> >        Session session
> =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
> >        Queue requestQueue =session.createQueue("queue1");
> >        MessageProducer replyProducer =session.createProducer(null);
> >        MessageConsumer requestConsumer
> =session.createConsumer(requestQueue);
> >        requestConsumer.setMessageListener(request -> {
> >           try {
> >              System.out.println("Received request message: " +
> ((TextMessage) request).getText());
> >              Destination replyDestination = request.getJMSReplyTo();
> >              System.out.println("Reply to queue: " +replyDestination);
> >              TextMessage replyMessage =session.createTextMessage("A
> reply message");
> >              replyMessage.setJMSCorrelationID(request.getJMSMessageID());
> >              replyProducer.send(replyDestination,replyMessage);
> >              System.out.println("Reply sent");
> >           }catch (JMSException e) {
> >              e.printStackTrace();
> >           }
> >        });
> >     }
> >
> >     public void shutdown()throws JMSException {
> >        connection.close();
> >     }
> > }
> >
> >
>

Reply via email to