Hello Claus, hello Jim!

We also using fixed persistent reply to queue which are *exclusive* to
Camel. We have to do this, because we must not loose messages which is
potentially the case for temp queues, as I understood.

Best,
Christian

On Sat, Jul 9, 2011 at 5:52 PM, Claus Ibsen <[email protected]> wrote:

> Hi Jim
>
> I have created a ticket and posted some comments about the issue
> https://issues.apache.org/jira/browse/CAMEL-4202
>
> Are you using a fixed reply to queue that is *exclusive* to the Camel
> route?
> Or is the queue used for other purposes as well?
>
> Is there a special reason why you want to use a fixed reply to queue?
>
>
>
> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham <[email protected]>
> wrote:
> >
> > Hi Claus,
> >
> > I enabled trace logging.  I'm attaching the logs (for both client and
> > server; both with and without custom replyTo) as a zip file -- not sure
> if
> > the mailing list will filter it, we'll see.
> >
> > I see that there are 5 messages in the client log which only appear when
> a
> > custom replyTo is specified:  "Running purge task to see if any entries
> has
> > been timed out", "There are 1 in the timeout map", "did not receive a
> > message", etc.  Here's an excerpt from each client log, for one exchange:
> >
> > From log for client without replyTo:
> >
> > 2011-07-08 10:55:32,354 [main] TRACE
> > org.apache.camel.component.jms.JmsProducer - Using inOut jms template
> > 2011-07-08 10:55:32,361 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Executing
> > callback on JMS Session: ActiveMQSession
> > {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
> > 2011-07-08 10:55:32,361 [main] TRACE
> > org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
> > 2011-07-08 10:55:32,362 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Sending
> > JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
> =
> > 0, responseRequired = false, messageId = null, originalDestination =
> null,
> > originalTransactionId = null, producerId = null, destination = null,
> > transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
> > brokerInTime = 0, brokerOutTime = 0, correlationId =
> > ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
> > temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
> > false, type = null, priority = 0, groupID = null, groupSequence = 0,
> > targetConsumerId = null, compressed = false, userID = null, content =
> null,
> > marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> > size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
> > false, droppable = false, text = abc}
> > 2011-07-08 10:55:32,363 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
> JMS
> > message to: queue://dest with message: ActiveMQTextMessage {commandId =
> 0,
> > responseRequired = false, messageId =
> > ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
> > null, originalTransactionId = null, producerId = null, destination =
> > queue://dest, transactionId = null, expiration = 1310158552362, timestamp
> =
> > 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> > correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
> > temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
> > true, type = null, priority = 4, groupID = null, groupSequence = 0,
> > targetConsumerId = null, compressed = false, userID = null, content =
> null,
> > marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> > size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
> > false, droppable = false, text = abc}
> > 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
> > org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
> Received
> > reply message with correlationID:
> ID-rsi-eng-newsham-61472-1310158530715-0-4
> > -> ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId
> =
> > ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
> > null, originalTransactionId = null, producerId =
> > ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
> > temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
> =
> > null, expiration = 0, timestamp = 1310158532367, arrival = 0,
> brokerInTime =
> > 1310158532367, brokerOutTime = 1310158532367, correlationId =
> > ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
> > true, type = null, priority = 4, groupID = null, groupSequence = 0,
> > targetConsumerId = null, compressed = false, userID = null, content =
> null,
> > marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> > size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
> true,
> > droppable = false, text = reply}
> > 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
> > org.apache.camel.component.jms.JmsBinding - Extracting body as a
> TextMessage
> > from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
> > true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
> > originalDestination = null, originalTransactionId = null, producerId =
> > ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
> > temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
> =
> > null, expiration = 0, timestamp = 1310158532367, arrival = 0,
> brokerInTime =
> > 1310158532367, brokerOutTime = 1310158532367, correlationId =
> > ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
> > true, type = null, priority = 4, groupID = null, groupSequence = 0,
> > targetConsumerId = null, compressed = false, userID = null, content =
> null,
> > marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
> > size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
> true,
> > droppable = false, text = reply}
> > 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
> > org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
> > received. Setting reply as OUT message: reply
> > received reply in: 0.015 s
> >
> > From log for client with replyTo:
> >
> > 2011-07-08 10:52:10,075 [main] TRACE
> > org.apache.camel.component.jms.JmsProducer - Using inOut jms template
> > 2011-07-08 10:52:10,081 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Executing
> > callback on JMS Session: ActiveMQSession
> > {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
> > 2011-07-08 10:52:10,082 [main] TRACE
> > org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
> > 2011-07-08 10:52:10,082 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
> Sending
> > JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
> =
> > 0, responseRequired = false, messageId = null, originalDestination =
> null,
> > originalTransactionId = null, producerId = null, destination = null,
> > transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
> > brokerInTime = 0, brokerOutTime = 0, correlationId =
> > ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue,
> > persistent = false, type = null, priority = 0, groupID = null,
> groupSequence
> > = 0, targetConsumerId = null, compressed = false, userID = null, content
> =
> > null, marshalledProperties = null, dataStructure = null,
> redeliveryCounter =
> > 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody
> =
> > false, droppable = false, text = abc}
> > 2011-07-08 10:52:10,083 [main] DEBUG
> > org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
> JMS
> > message to: queue://dest with message: ActiveMQTextMessage {commandId =
> 0,
> > responseRequired = false, messageId =
> > ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
> > null, originalTransactionId = null, producerId = null, destination =
> > queue://dest, transactionId = null, expiration = 1310158350082, timestamp
> =
> > 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
> > correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
> > queue://replyQueue, persistent = true, type = null, priority = 4, groupID
> =
> > null, groupSequence = 0, targetConsumerId = null, compressed = false,
> userID
> > = null, content = null, marshalledProperties = null, dataStructure =
> null,
> > redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> > false, readOnlyBody = false, droppable = false, text = abc}
> > 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
> > JmsReplyManagerTimeoutChecker[dest] TRACE
> > org.apache.camel.component.jms.reply.CorrelationMap - Running purge task
> to
> > see if any entries has been timed out
> > 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
> > JmsReplyManagerTimeoutChecker[dest] TRACE
> > org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the
> > timeout map
> > 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
> >
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
> > - Consumer [ActiveMQMessageConsumer {
> > value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of
> > session [ActiveMQSession
> > {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
> > receive a message
> > 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
> > org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using
> >
> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
> > 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> >
> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
> > - Received message of type [class
> > org.apache.activemq.command.ActiveMQTextMessage] from consumer
> > [ActiveMQMessageConsumer {
> > value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of
> > session [ActiveMQSession
> > {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
> > 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> > org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
> Received
> > reply message with correlationID:
> ID-rsi-eng-newsham-61454-1310158327407-0-4
> > -> ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId
> =
> > ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
> > null, originalTransactionId = null, producerId =
> > ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
> > queue://replyQueue, transactionId = null, expiration = 0, timestamp =
> > 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
> > 1310158331077, correlationId =
> ID-rsi-eng-newsham-61454-1310158327407-0-4,
> > replyTo = null, persistent = true, type = null, priority = 4, groupID =
> > null, groupSequence = 0, targetConsumerId = null, compressed = false,
> userID
> > = null, content = null, marshalledProperties = null, dataStructure =
> null,
> > redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> > true, readOnlyBody = true, droppable = false, text = reply}
> > 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
> > org.apache.camel.component.jms.JmsBinding - Extracting body as a
> TextMessage
> > from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
> > true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
> > originalDestination = null, originalTransactionId = null, producerId =
> > ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
> > queue://replyQueue, transactionId = null, expiration = 0, timestamp =
> > 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
> > 1310158331077, correlationId =
> ID-rsi-eng-newsham-61454-1310158327407-0-4,
> > replyTo = null, persistent = true, type = null, priority = 4, groupID =
> > null, groupSequence = 0, targetConsumerId = null, compressed = false,
> userID
> > = null, content = null, marshalledProperties = null, dataStructure =
> null,
> > redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
> > true, readOnlyBody = true, droppable = false, text = reply}
> > 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
> > org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply
> > received. Setting reply as OUT message: reply
> > received reply in: 1.004 s
> >
> > Thanks,
> > Jim
> >
> >
> >
> > On 7/7/2011 10:59 PM, Claus Ibsen wrote:
> >>
> >> Can you enable TRACE logging at org.apache.camel.component.jms and run
> >> it for both examples.
> >> To see what happens.
> >>
> >>
> >>
> >> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<[email protected]>
> >>  wrote:
> >>>
> >>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when
> I
> >>> specify a custom replyTo destination on the endpoint url, the time it
> >>> takes
> >>> for the producer to receive a reply increases drastically.  The curious
> >>> thing is that the time to receive a reply is almost exactly 1 second.
> >>>  When
> >>> I remove the replyTo from the url, everything's fast again.
> >>>
> >>> I created a very simple, stand-alone test to demonstrate what I'm
> seeing.
> >>>  There is a server class [4] which runs an embedded instance of
> ActiveMQ
> >>> and
> >>> simply replies to messages as they arrive; and a client [3] class which
> >>> simply sends messages to the server, and prints the elapsed time.  The
> >>> USE_REPLY_TO symbolic constant in the client determines whether a
> replyTo
> >>> value is added to the url or not.
> >>>
> >>> The client output when USE_REPLY_TO is false is shown as [1].  The
> client
> >>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
> >>> trivial.  Am I doing something wrong, or is this a Camel and/or
> ActiveMQ
> >>> issue?
> >>>
> >>> Thanks!
> >>> Jim
> >>>
> >>>
> >>> [1] USE_REPLY_TO = false
> >>>
> >>> received reply in: 0.476 s
> >>> received reply in: 0.006 s
> >>> received reply in: 0.006 s
> >>> received reply in: 0.006 s
> >>> received reply in: 0.006 s
> >>> ...
> >>>
> >>>
> >>> [2] USE_REPLY_TO = true
> >>>
> >>> received reply in: 1.524 s
> >>> received reply in: 1.002 s
> >>> received reply in: 1.003 s
> >>> received reply in: 1.003 s
> >>> received reply in: 1.002 s
> >>> ...
> >>>
> >>>
> >>> [3] TestReplyToClient.java
> >>>
> >>> package test;
> >>>
> >>> import org.apache.activemq.ActiveMQConnectionFactory;
> >>> import org.apache.activemq.camel.component.ActiveMQComponent;
> >>> import org.apache.camel.CamelContext;
> >>> import org.apache.camel.ProducerTemplate;
> >>> import org.apache.camel.impl.DefaultCamelContext;
> >>>
> >>> public class TestReplyToClient {
> >>>
> >>>  private static final boolean USE_REPLY_TO = false;
> >>>
> >>>  public static void main(String... args) throws Exception {
> >>>    // create camel context; configure activemq component for
> >>> tcp://localhost:7001
> >>>    CamelContext context = new DefaultCamelContext();
> >>>    ActiveMQComponent activemqComponent =
> >>> ActiveMQComponent.activeMQComponent();
> >>>    activemqComponent.setConnectionFactory(new
> ActiveMQConnectionFactory(
> >>>      null, null, "tcp://localhost:7001"));
> >>>    context.addComponent("activemq", activemqComponent);
> >>>    context.start();
> >>>
> >>>    // define url to send requests to
> >>>    String sendUrl = "activemq:queue:dest";
> >>>    if (USE_REPLY_TO) {
> >>>      sendUrl += "?replyTo=replyQueue";
> >>>    }
> >>>    System.err.println("sending to url: " + sendUrl);
> >>>
> >>>    // repeatedly send requests; measure elapsed time
> >>>    ProducerTemplate template = context.createProducerTemplate();
> >>>    while (true) {
> >>>      long startNanos = System.nanoTime();
> >>>      template.requestBody(sendUrl, "abc");
> >>>      long elapsedNanos = System.nanoTime() - startNanos;
> >>>      System.err.println(String.format("received reply in: %.3f s",
> >>> elapsedNanos / 1000000000.0));
> >>>    }
> >>>  }
> >>>
> >>> }
> >>>
> >>>
> >>> [4] TestReplyToServer.java
> >>>
> >>> package test;
> >>>
> >>> import org.apache.activemq.broker.BrokerService;
> >>> import org.apache.activemq.camel.component.ActiveMQComponent;
> >>> import org.apache.camel.CamelContext;
> >>> import org.apache.camel.Exchange;
> >>> import org.apache.camel.Processor;
> >>> import org.apache.camel.builder.RouteBuilder;
> >>> import org.apache.camel.impl.DefaultCamelContext;
> >>>
> >>> public class TestReplyToServer {
> >>>
> >>>  private static final String BROKER_NAME = "thebroker";
> >>>
> >>>  public static void main(String... args) throws Exception {
> >>>    startBroker();
> >>>    startCamel();
> >>>    Thread.sleep(Long.MAX_VALUE);
> >>>  }
> >>>
> >>>  private static void startBroker() throws Exception {
> >>>    BrokerService brokerService = new BrokerService();
> >>>    brokerService.setBrokerName(BROKER_NAME);
> >>>    brokerService.setSchedulerSupport(false);
> >>>    brokerService.setPersistent(false);
> >>>    brokerService.addConnector("tcp://0.0.0.0:7001");
> >>>    brokerService.start();
> >>>    brokerService.waitUntilStarted();
> >>>  }
> >>>
> >>>
> >>>  private static void startCamel() throws Exception {
> >>>    CamelContext context = new DefaultCamelContext();
> >>>
> >>>    ActiveMQComponent activemqComponent =
> >>> ActiveMQComponent.activeMQComponent();
> >>>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
> >>> BROKER_NAME));
> >>>    context.addComponent("activemq", activemqComponent);
> >>>
> >>>    final String receiveUrl = "activemq:queue:dest";
> >>>    context.addRoutes(new RouteBuilder() {
> >>>      @Override
> >>>      public void configure() throws Exception {
> >>>        from(receiveUrl).process(new Processor() {
> >>>          @Override
> >>>          public void process(Exchange exchange) throws Exception {
> >>>            System.err.println("received request");
> >>>            exchange.getOut().setBody("reply");
> >>>          }
> >>>        });
> >>>      }
> >>>    });
> >>>
> >>>    context.start();
> >>>    System.err.println("listening on url: " + receiveUrl);
> >>>  }
> >>>
> >>> }
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: [email protected]
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>

Reply via email to