Hi,

I encountered a problem that a persistence message is not dispatched but
missing to a durable subscriber, especially when the subscriber
reconnects(restarts).  Here's the test environment, the scenario and the
problem.

*test env*
ActiveMQ 5.7.0/jdk 1.6
All configurations are not touched to be default but only persistent is
disabled: BrokerService#setPersistent(false)

*test purpose*
test how persistent messages are dispatched properly to a durable subscriber
even when the durable subscriber has intermittent connection situation.

*test scenario*
1. A durable subscriber is created to a topic.
2. A publisher is created and publishes 30 (m1,m2 .. m30) messages to a
topic.
3. The durable subscriber reconnects/resume every after consuming 2
messages.  So, it's supposed to reconnect 15 times until it completes
consuming all 30 messages.

*problem*
The durable subscriber is missing a message sporadically (at step 3 above). 
For example, it receives m6 after m4 (m5 is missing).

Per the trace level logs from org.apache.activemq.broker.region.cursors
package (see below), looks like the deactivating thread (by
javax.jms.MessageConsumer#close) doesn't move pending messages back to
pending list properly but does only inflight messages (at
DurableTopicSubscription#deactivate), especially when
DurableTopicSubscription#dispatchPending is called by publishing thread
simultaneously.

*trace log*
-----------------
/### [publisher connection] a message (ID:27) is published and cached
successfully to be directly dispatched./
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxB
atchSize:16 - addMessageLast: mid=ID:<producer_id>:27
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] TRACE
o.a.a.b.r.cursors.TopicStorePrefetch - recover: ID:<producer_id>:27,
priority: 4

/### [subscriber connection] subscriber stop (by
javax.jms.MessageConsumer#close) kicked in./
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.DurableTopicSubscription   - Deactivating keepActive=true,
DurableTopicSubscription-cli-0:sub-0, id=ID:<consumer_id>, active=true,
destinations=1, total=27, pending=1, dispatched=53, inflight=15,
prefetchExtension=0

/### [publisher connection] the cached message (ID:27) was removed from
pending list and dispatched./
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=1,cacheEnabled=true,maxB
atchSize:16 - remove()
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] TRACE
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxB
atchSize:16 - fillBatch

/### [subscriber connection] consumer deactivation only moved 15 inflight
messages (ID:12 - ID:26) back to pending list but missed (ID:27) which is
just dispatched./
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=0,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:26)
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=1,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:25)
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=2,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:24)
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=3,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:23)
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=4,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:22)
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=5,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:21)
20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=6,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:20)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=7,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:19)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=8,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:18)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=9,cacheEnabled=false,maxB
atchSize:16 - addMessageFirst(mid=ID:<producer_id>:17)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=10,cacheEnabled=false,max
BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:16)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=11,cacheEnabled=false,max
BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:15)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=12,cacheEnabled=false,max
BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:14)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=13,cacheEnabled=false,max
BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:13)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
ID:<consumer_id> -
org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=14,cacheEnabled=false,max
BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:12)
20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
o.a.a.broker.region.AbstractRegion   - broker-tcp:__weehomespent-lm:61616
removing consumer: ID:<consumer_id> for destination:
ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
-----------------


I'll be greatly appreciated if any user or activemq developer can help
figure out if this is a bug, or let me know if I'm missing something.

Regards,
Jaewoong



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/persistent-message-missing-to-a-durable-subscriber-when-it-reconnects-restarts-tp4665130.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to