yes, if the failure to connect/reconnect bubbles up to the
application, it will need to initiate a new connection.

fire and forget in jms terms is still mediated by the broker, and
finding a broker is currently a synchronous operation. A jms
connection that ignores the fact that it does not have a broker at the
other end is a little disrespectful of the jms contracts :-)

You have two options, both of which involve mediating access to the
target broker.

You could write a wrapper that maintains a jms connection in a
separate thread (so it does not care about a blocking connect) and
only delegates when the connection is active, discarding messages
otherwise. This sort of fire and forget wrapper would be a nice
enhancement. It could be a DiscardingProducerWrapper that uses the
org.apache.activemq.transport.TransportListener#transportResumed
callback to determine the state of the connection.

You could introduce an embedded broker in the client that could
forward messages to the target broker via a network connector. It
could expire pending messages when the network connector is down.


On 17 April 2012 11:41, Brook, James <jbr...@upcbroadband.com> wrote:
> Thanks Gary. I am still not completely sure I get it. I am using failover 
> because I *do* want my client to reconnect when the broker comes back up. 
> However, when it's down I don't want to queue messages or block anything - 
> they can be thrown away. My messages really are 'fire and forget'. These lose 
> their meaning if not delivered to their ultimate destination within 2 seconds 
> or so.
>
> My expectation was that I would be able to asynchronously send messages and 
> never block. I have tried using non-persistent messages, and telling the 
> transport not to track messages.
>
> If I do what you suggest won't the failover transport wait 10ms, try to 
> reconnect once and then give up forever? Wouldn't that mean someone has to 
> take manual action to reestablish the connection?
>
> Sorry if I am missing the point.
>
> On Apr 17, 2012, at 12:32 PM, Gary Tully wrote:
>
>> configure the failover maxReconnectAttempts and the failure to connect
>> will bubble up to the application where you can ignore it.
>> The assumption of failover is that you want to 'transparently'
>> maintain the connection and block pending reconnect
>>
>> failover:(...)? maxReconnectAttempts=1
>>
>> http://activemq.apache.org/failover-transport-reference.html
>>
>> On 16 April 2012 19:52, Brook, James <jbr...@upcbroadband.com> wrote:
>>> I am trying to make sure my application is resilient. It sends JMS messages 
>>> to a remote ActiveMQ broker. Unfortunately when that broker goes down the 
>>> client starts to block even though I am using the Camel asyncSendBody 
>>> method. See below for the stack trace.
>>>
>>> The ActiveMQ version is 5.3.0 although I have the same problem with the 
>>> most recent stable release.
>>>
>>> This is the argument I give to the asyncSendBody method:
>>>
>>>    jms:queue:myQueue?jmsMessageType=Text&timeToLive=5000
>>>
>>> My connection factory uses this URL:
>>>
>>>    
>>> failover:(tcp://my.ip:61613,tcp://my:ip:61613)?useExponentialBackOff=true&randomize=false&timeout=1000
>>>
>>> As the broker starts to fail I see lots of the following errors in the log:
>>>
>>>    org.apache.camel.processor.UnitOfWorkProcessor  - Caught unhandled 
>>> exception while processing ExchangeId: 
>>> ID-james-upcit-ds-upc-biz-64404-1334601186430-0-38
>>> IOException: Failover timeout of 1000 ms reached
>>>
>>>   org.apache.activemq.transport.failover.FailoverTransport  - Failover 
>>> timed out after 29998ms
>>>
>>> Then after attempting to send quite a few more messages the blocking 
>>> starts. I can't go live with my application in this state and I really 
>>> don't want to put another queue in place to decouple the JMS producer. Is 
>>> there something I am doing wrong? If the broker is down I am quite happy 
>>> for these messages to be lost - they lose their meaning if not delivered 
>>> immediately.
>>>
>>> Thanks
>>> James
>>>
>>> at java.lang.Object.wait(Native Method)
>>>        at 
>>> org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:433)
>>>        at 
>>> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>>>        at 
>>> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:74)
>>>        at 
>>> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:79)
>>>        at 
>>> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1244)
>>>        at 
>>> org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1350)
>>>        at 
>>> org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:300)
>>>        at 
>>> org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:196)
>>>        at 
>>> org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:457)
>>>        at 
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:185)
>>>        at 
>>> org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:359)
>>>        at 
>>> org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:313)
>>>        at 
>>> org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:111)
>>>        at 
>>> org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
>>>        at 
>>> org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
>>>        at 
>>> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
>>>        at 
>>> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
>>>        at 
>>> org.apache.camel.processor.UnitOfWorkProducer.process(UnitOfWorkProducer.java:63)
>>>        at 
>>> org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:352)
>>>        at 
>>> org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:324)
>>>        at 
>>> org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:223)
>>>        at 
>>> org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:324)
>>>        at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:169)
>>>        at 
>>> org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:111)
>>>        at 
>>> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:124)
>>>        at 
>>> org.apache.camel.impl.DefaultProducerTemplate$14.call(DefaultProducerTemplate.java:596)
>>>        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>        at 
>>> java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(ThreadPoolExecutor.java:1746)
>>>        at 
>>> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
>>>        at 
>>> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
>>>        at 
>>> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:92)
>>>        at 
>>> org.apache.camel.impl.DefaultProducerTemplate.asyncSendBody(DefaultProducerTemplate.java:601)
>>>        at 
>>> org.apache.camel.impl.DefaultProducerTemplate.asyncSendBody(DefaultProducerTemplate.java:458)
>>
>>
>>
>> --
>> http://fusesource.com
>> http://blog.garytully.com
>



-- 
http://fusesource.com
http://blog.garytully.com

Reply via email to