Hi Josh,

that's the job of inactivity monitor when using the OpenWire. Unfortunately
Stomp doesn't support that in version 1.0 and it is something we want to add
in the next version of the spec. Maybe implementing something like that on
the application level would help in your case?

Cheers
--
Dejan Bosanac - http://twitter.com/dejanb

Open Source Integration - http://fusesource.com/
ActiveMQ in Action - http://www.manning.com/snyder/
Blog - http://www.nighttale.net


On Wed, Apr 14, 2010 at 5:41 PM, Josh Carlson <jcarl...@e-dialog.com> wrote:

> Hmm. If a timeout was the solution to this problem how would you be able to
> tell the difference between something being wrong and the client just being
> slow.
>
> I did an strace on the server and discovered how the timeout is being used.
> As a parameter to poll
>
> 6805  10:31:15 poll([{fd=94, events=POLLIN|POLLERR}], 1, 60000 <unfinished
> ...>
> 6805  10:31:15 <... poll resumed> )     = 1 ([{fd=94, revents=POLLIN}])
> 6805  10:31:15 recvfrom(94, "CONNECT\npasscode:...."..., 8192, 0, NULL,
> NULL) = 39
> 6805  10:31:15 sendto(94, "CONNECTED\nsession:ID:mmq1-40144-"..., 53, 0,
> NULL, 0) = 53
> 6805  10:31:15 poll([{fd=94, events=POLLIN|POLLERR}], 1, 60000) = 1
> ([{fd=94, revents=POLLIN}])
> 6805  10:31:15 recvfrom(94, "SUBSCRIBE\nactivemq.prefetchSize:"..., 8192,
> 0, NULL, NULL) = 138
> 6805  10:31:15 sendto(94, "RECEIPT\nreceipt-id:39ef0e071a549"..., 55, 0,
> NULL, 0) = 55
> 6805  10:31:15 poll([{fd=94, events=POLLIN|POLLERR}], 1, 60000 <unfinished
> ...>
> 6805  10:32:15 <... poll resumed> )     = 0 (Timeout)
> 6805  10:32:15 poll([{fd=94, events=POLLIN|POLLERR}], 1, 60000 <unfinished
> ...>
> 6805  10:33:15 <... poll resumed> )     = 0 (Timeout)
> 6805  10:33:15 poll([{fd=94, events=POLLIN|POLLERR}], 1, 60000 <unfinished
> ...>
> 6805  10:34:15 <... poll resumed> )     = 0 (Timeout)
>
> In the output above I stripped lines that were not operations directly on
> the socket. The poll Timeouts continue on ... with nothing in between.
>
> [r...@mmq1 tmp]# lsof -p 6755 | grep mmq1
> java    6755 root   85u  IPv6            1036912                 TCP
> mmq1.eng.e-dialog.com:61613 (LISTEN)
> java    6755 root   92u  IPv6            1038039                 TCP
> mmq1.eng.e-dialog.com:61613->10.0.13.230:46542 (ESTABLISHED)
> java    6755 root   94u  IPv6            1036997                 TCP
> mmq1.eng.e-dialog.com:61613->mmd2.eng.e-dialog.com:41743 (ESTABLISHED)
>
> The connection to mmd2 is the host that is gone. The one to 10.0.13.230 is
> up and active. When I kill -9 the process on 10.0.13.230 I see this in the
> logs:
>
> 2010-04-13 17:13:55,322 | DEBUG | Transport failed: java.io.EOFException |
> org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ
> Transport: tcp:///10.0.13.230:45463
> java.io.EOFException
>        at java.io.DataInputStream.readByte(Unknown Source)
>        at
> org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:186)
>        at
> org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:94)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:211)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186)
>        at java.lang.Thread.run(Unknown Source)
> 2010-04-13 17:13:55,325 | DEBUG | Stopping connection: /10.0.13.230:45463| 
> org.apache.activemq.broker.TransportConnection | ActiveMQ Task
> 2010-04-13 17:13:55,325 | DEBUG | Stopping transport tcp:///
> 10.0.13.230:45463 | org.apache.activemq.transport.tcp.TcpTransport |
> ActiveMQ Task
> 2010-04-13 17:13:55,326 | DEBUG | Stopped transport: /10.0.13.230:45463 |
> org.apache.activemq.broker.TransportConnection | ActiveMQ Task
> 2010-04-13 17:13:55,327 | DEBUG | Cleaning up connection resources: /
> 10.0.13.230:45463 | org.apache.activemq.broker.TransportConnection |
> ActiveMQ Task
> 2010-04-13 17:13:55,327 | DEBUG | remove connection id:
> ID:mmq1-58415-1271193024658-2:3 |
> org.apache.activemq.broker.TransportConnection | ActiveMQ Task
> 2010-04-13 17:13:55,328 | DEBUG | masterb1 removing consumer:
> ID:mmq1-58415-1271193024658-2:3:-1:1 for destination:
> queue://Producer/TESTING/weight/three/ |
> org.apache.activemq.broker.region.AbstractRegion | ActiveMQ Task
>
> Which is what I want to happen when the host goes down.
>
> It seems to be that something should be noticing that the connection is
> really gone. Maybe this is more of a kernel issue. I would think that when
> the poll is done that it would trigger the connection to move from the
> ESTABLISHED state and get closed.
>
> We are using Linux, kernel version 2.6.18, but I've seen this same issue on
> a range of different 2.6 versions.
>
> -Josh
>
>
>
> On 04/14/2010 09:38 AM, Josh Carlson wrote:
>
>> Thanks Gary for the, as usual, helpful information.
>>
>> It looks like the broker maybe suffering from exactly the same problem
>> we encountered when implementing client-side failover. Namely that when
>> the master broker went down a subsequent read on the socket by the
>> client would hang (well actually take a very long time to fail/timeout).
>> In that case our TCP connection was ESTABLISHED and looking at the
>> broker I see the same thing after the client host goes away (the
>> connection is ESTABLISHED). We fixed this issue in our client by setting
>> the socket option SO_RCVTIMEO on the connection to the broker.
>>
>> I noted what the broker appears to do the same thing with the TCP
>> transport option soTimeout. It looks like when this is set it winds up
>> as a call to java.net.Socket.setSoTimeout when the socket is getting
>> initialized. I have not done any socket programming in Java but my
>> assumption is that SO_TIMEOUT maps to both SO_RCVTIMEO and SO_SNDTIMEO
>> in the C world.
>>
>> I was hopeful with this option but when I set in in my transport
>> connector:
>>
>> <transportConnector name="stomp"
>> uri="stomp://mmq1:61613?soTimeout=60000"/>
>>
>> the timeout does not occur. I actually ran my test case about 15 hours
>> ago and I can still see that the broker still has an ESTABLISHED
>> connection to the dead client and has a message dispatched to it.
>>
>> Am I miss understanding what soTimeout is for? I can see in
>> org.apache.activemq.transport.tcp.TcpTransport.initialiseSocket that
>> setSoTimeout is getting called unconditionally. So what I'm wondering is
>> if it is actually calling it with a 0 value despite the way I set up my
>> transport connector. I suppose setting this to 0 would explain why it
>> apparently never times out where in our client case it eventually did
>> timeout (because we were not setting the option at all before).
>>
>>
>>
>>
>> On 04/14/2010 05:15 AM, Gary Tully wrote:
>>
>>
>>> The re-dispatch is triggered by the tcp connection dying, netstat can
>>> help with the diagnosis here. Check the connection state of the broker
>>> port after the client host is rebooted, if the connection is still
>>> active, possibly in a timed_wait state, you may need to configure some
>>> additional timeout options on the broker side.
>>>
>>> On 13 April 2010 19:43, Josh Carlson<jcarl...@e-dialog.com
>>> <mailto:jcarl...@e-dialog.com>>  wrote:
>>>
>>>     I am using client acknowledgements with a prefetch size of 1 with
>>>     no message expiration policy. When a consumer subscribes to a
>>>     queue I can see that the message gets dispatched correctly. If the
>>>     process gets killed before retrieving and acknowledging the
>>>     message I see the message getting re-dispatched (correctly). I
>>>     expected this same behaviour if the host running the process gets
>>>     rebooted or crashes. However, after reboot I can see that the
>>>     message is stuck in the dispatched state to the consumer that is
>>>     long gone. Is there a way that I can get messages re-dispatched
>>>     when a host hosting consumer processes gets re-booted? How does it
>>>     detect the case when a process dies (even with SIGKILL)?
>>>
>>>     I did notice that if I increase my prefetch size and enqueue
>>>     another message after the reboot, that activemq will re-dispatch
>>>     the original message. However with prefetch size equal to one the
>>>     message never seems to get re-dispatched.
>>>
>>>
>>>
>>>
>>> --
>>> http://blog.garytully.com
>>>
>>> Open Source Integration
>>> http://fusesource.com
>>>
>>>
>>

Reply via email to