Hi, I encountered a problem that a persistence message is not dispatched but missing to a durable subscriber, especially when the subscriber reconnects(restarts). Here's the test environment, the scenario and the problem.
*test env* ActiveMQ 5.7.0/jdk 1.6 All configurations are not touched to be default but only persistent is disabled: BrokerService#setPersistent(false) *test purpose* test how persistent messages are dispatched properly to a durable subscriber even when the durable subscriber has intermittent connection situation. *test scenario* 1. A durable subscriber is created to a topic. 2. A publisher is created and publishes 30 (m1,m2 .. m30) messages to a topic. 3. The durable subscriber reconnects/resume every after consuming 2 messages. So, it's supposed to reconnect 15 times until it completes consuming all 30 messages. *problem* The durable subscriber is missing a message sporadically (at step 3 above). For example, it receives m6 after m4 (m5 is missing). Per the trace level logs from org.apache.activemq.broker.region.cursors package (see below), looks like the deactivating thread (by javax.jms.MessageConsumer#close) doesn't move pending messages back to pending list properly but does only inflight messages (at DurableTopicSubscription#deactivate), especially when DurableTopicSubscription#dispatchPending is called by publishing thread simultaneously. *trace log* ----------------- /### [publisher connection] a message (ID:27) is published and cached successfully to be directly dispatched./ 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxB atchSize:16 - addMessageLast: mid=ID:<producer_id>:27 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] TRACE o.a.a.b.r.cursors.TopicStorePrefetch - recover: ID:<producer_id>:27, priority: 4 /### [subscriber connection] subscriber stop (by javax.jms.MessageConsumer#close) kicked in./ 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.DurableTopicSubscription - Deactivating keepActive=true, DurableTopicSubscription-cli-0:sub-0, id=ID:<consumer_id>, active=true, destinations=1, total=27, pending=1, dispatched=53, inflight=15, prefetchExtension=0 /### [publisher connection] the cached message (ID:27) was removed from pending list and dispatched./ 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=1,cacheEnabled=true,maxB atchSize:16 - remove() 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] TRACE o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxB atchSize:16 - fillBatch /### [subscriber connection] consumer deactivation only moved 15 inflight messages (ID:12 - ID:26) back to pending list but missed (ID:27) which is just dispatched./ 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=0,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:26) 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=1,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:25) 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=2,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:24) 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=3,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:23) 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=4,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:22) 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=5,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:21) 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=6,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:20) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=7,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:19) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=8,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:18) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=9,cacheEnabled=false,maxB atchSize:16 - addMessageFirst(mid=ID:<producer_id>:17) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=10,cacheEnabled=false,max BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:16) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=11,cacheEnabled=false,max BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:15) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=12,cacheEnabled=false,max BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:14) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=13,cacheEnabled=false,max BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:13) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) ID:<consumer_id> - org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0:<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=14,cacheEnabled=false,max BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:12) 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG o.a.a.broker.region.AbstractRegion - broker-tcp:__weehomespent-lm:61616 removing consumer: ID:<consumer_id> for destination: ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic ----------------- I'll be greatly appreciated if any user or activemq developer can help figure out if this is a bug, or let me know if I'm missing something. Regards, Jaewoong -- View this message in context: http://activemq.2283324.n4.nabble.com/persistent-message-missing-to-a-durable-subscriber-when-it-reconnects-restarts-tp4665130.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.