Can you give it a try on a recent snapshot? https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.9-SNAPSHOT/
On Mon, Mar 25, 2013 at 11:39 PM, Jaewoong Choi <jaewoong.c...@ymail.com>wrote: > Hi, > > I encountered a problem that a persistence message is not dispatched but > missing to a durable subscriber, especially when the subscriber > reconnects(restarts). Here's the test environment, the scenario and the > problem. > > *test env* > ActiveMQ 5.7.0/jdk 1.6 > All configurations are not touched to be default but only persistent is > disabled: BrokerService#setPersistent(false) > > *test purpose* > test how persistent messages are dispatched properly to a durable > subscriber > even when the durable subscriber has intermittent connection situation. > > *test scenario* > 1. A durable subscriber is created to a topic. > 2. A publisher is created and publishes 30 (m1,m2 .. m30) messages to a > topic. > 3. The durable subscriber reconnects/resume every after consuming 2 > messages. So, it's supposed to reconnect 15 times until it completes > consuming all 30 messages. > > *problem* > The durable subscriber is missing a message sporadically (at step 3 above). > For example, it receives m6 after m4 (m5 is missing). > > Per the trace level logs from org.apache.activemq.broker.region.cursors > package (see below), looks like the deactivating thread (by > javax.jms.MessageConsumer#close) doesn't move pending messages back to > pending list properly but does only inflight messages (at > DurableTopicSubscription#deactivate), especially when > DurableTopicSubscription#dispatchPending is called by publishing thread > simultaneously. > > *trace log* > ----------------- > /### [publisher connection] a message (ID:27) is published and cached > successfully to be directly dispatched./ > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxB > atchSize:16 - addMessageLast: mid=ID:<producer_id>:27 > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] TRACE > o.a.a.b.r.cursors.TopicStorePrefetch - recover: ID:<producer_id>:27, > priority: 4 > > /### [subscriber connection] subscriber stop (by > javax.jms.MessageConsumer#close) kicked in./ > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.DurableTopicSubscription - Deactivating keepActive=true, > DurableTopicSubscription-cli-0:sub-0, id=ID:<consumer_id>, active=true, > destinations=1, total=27, pending=1, dispatched=53, inflight=15, > prefetchExtension=0 > > /### [publisher connection] the cached message (ID:27) was removed from > pending list and dispatched./ > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=1,cacheEnabled=true,maxB > atchSize:16 - remove() > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] TRACE > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxB > atchSize:16 - fillBatch > > /### [subscriber connection] consumer deactivation only moved 15 inflight > messages (ID:12 - ID:26) back to pending list but missed (ID:27) which is > just dispatched./ > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=0,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:26) > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=1,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:25) > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=2,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:24) > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=3,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:23) > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=4,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:22) > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=5,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:21) > 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=6,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:20) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=7,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:19) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=8,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:18) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=9,cacheEnabled=false,maxB > atchSize:16 - addMessageFirst(mid=ID:<producer_id>:17) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=10,cacheEnabled=false,max > BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:16) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=11,cacheEnabled=false,max > BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:15) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=12,cacheEnabled=false,max > BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:14) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=13,cacheEnabled=false,max > BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:13) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.b.r.c.AbstractStoreCursor - TopicStorePrefetch(cli-0,sub-0) > ID:<consumer_id> - > org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0 > :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=14,cacheEnabled=false,max > BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:12) > 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG > o.a.a.broker.region.AbstractRegion - broker-tcp:__weehomespent-lm:61616 > removing consumer: ID:<consumer_id> for destination: > ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic > ----------------- > > > I'll be greatly appreciated if any user or activemq developer can help > figure out if this is a bug, or let me know if I'm missing something. > > Regards, > Jaewoong > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/persistent-message-missing-to-a-durable-subscriber-when-it-reconnects-restarts-tp4665130.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > -- *Christian Posta* http://www.christianposta.com/blog twitter: @christianposta