Can you give it a try on a recent snapshot?
https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.9-SNAPSHOT/



On Mon, Mar 25, 2013 at 11:39 PM, Jaewoong Choi <jaewoong.c...@ymail.com>wrote:

> Hi,
>
> I encountered a problem that a persistence message is not dispatched but
> missing to a durable subscriber, especially when the subscriber
> reconnects(restarts).  Here's the test environment, the scenario and the
> problem.
>
> *test env*
> ActiveMQ 5.7.0/jdk 1.6
> All configurations are not touched to be default but only persistent is
> disabled: BrokerService#setPersistent(false)
>
> *test purpose*
> test how persistent messages are dispatched properly to a durable
> subscriber
> even when the durable subscriber has intermittent connection situation.
>
> *test scenario*
> 1. A durable subscriber is created to a topic.
> 2. A publisher is created and publishes 30 (m1,m2 .. m30) messages to a
> topic.
> 3. The durable subscriber reconnects/resume every after consuming 2
> messages.  So, it's supposed to reconnect 15 times until it completes
> consuming all 30 messages.
>
> *problem*
> The durable subscriber is missing a message sporadically (at step 3 above).
> For example, it receives m6 after m4 (m5 is missing).
>
> Per the trace level logs from org.apache.activemq.broker.region.cursors
> package (see below), looks like the deactivating thread (by
> javax.jms.MessageConsumer#close) doesn't move pending messages back to
> pending list properly but does only inflight messages (at
> DurableTopicSubscription#deactivate), especially when
> DurableTopicSubscription#dispatchPending is called by publishing thread
> simultaneously.
>
> *trace log*
> -----------------
> /### [publisher connection] a message (ID:27) is published and cached
> successfully to be directly dispatched./
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxB
> atchSize:16 - addMessageLast: mid=ID:<producer_id>:27
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] TRACE
> o.a.a.b.r.cursors.TopicStorePrefetch - recover: ID:<producer_id>:27,
> priority: 4
>
> /### [subscriber connection] subscriber stop (by
> javax.jms.MessageConsumer#close) kicked in./
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.DurableTopicSubscription   - Deactivating keepActive=true,
> DurableTopicSubscription-cli-0:sub-0, id=ID:<consumer_id>, active=true,
> destinations=1, total=27, pending=1, dispatched=53, inflight=15,
> prefetchExtension=0
>
> /### [publisher connection] the cached message (ID:27) was removed from
> pending list and dispatched./
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=1,cacheEnabled=true,maxB
> atchSize:16 - remove()
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49660@61616] TRACE
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=false,storeHasMessages=true,size=0,cacheEnabled=true,maxB
> atchSize:16 - fillBatch
>
> /### [subscriber connection] consumer deactivation only moved 15 inflight
> messages (ID:12 - ID:26) back to pending list but missed (ID:27) which is
> just dispatched./
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=0,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:26)
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=1,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:25)
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=2,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:24)
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=3,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:23)
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=4,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:22)
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=5,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:21)
> 20:46:59.881 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=6,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:20)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=7,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:19)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=8,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:18)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=9,cacheEnabled=false,maxB
> atchSize:16 - addMessageFirst(mid=ID:<producer_id>:17)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=10,cacheEnabled=false,max
> BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:16)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=11,cacheEnabled=false,max
> BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:15)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=12,cacheEnabled=false,max
> BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:14)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=13,cacheEnabled=false,max
> BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:13)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.b.r.c.AbstractStoreCursor      - TopicStorePrefetch(cli-0,sub-0)
> ID:<consumer_id> -
> org.apache.activemq.broker.region.cursors.TopicStorePrefetch@1d667df0
> :<topic_name>,batchResetNeeded=true,storeHasMessages=true,size=14,cacheEnabled=false,max
> BatchSize:16 - addMessageFirst(mid=ID:<producer_id>:12)
> 20:46:59.882 [ActiveMQ Transport: tcp:///10.73.199.7:49670@61616] DEBUG
> o.a.a.broker.region.AbstractRegion   - broker-tcp:__weehomespent-lm:61616
> removing consumer: ID:<consumer_id> for destination:
> ActiveMQ.Advisory.TempQueue,ActiveMQ.Advisory.TempTopic
> -----------------
>
>
> I'll be greatly appreciated if any user or activemq developer can help
> figure out if this is a bug, or let me know if I'm missing something.
>
> Regards,
> Jaewoong
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/persistent-message-missing-to-a-durable-subscriber-when-it-reconnects-restarts-tp4665130.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Reply via email to