Hi Tim, we managed to attach a debugger as requested and the path you
identified does seem to be the one taken by the incorrect update to
ACTIVEMQ_ACKS table in respect of DC1. We took several stack dumps (text
below) at the different debug points you requested in case that also helps.
Note that the code passed through the OracleJDBCAdapter.doSetLastAck method
twice, the first time updating LAST_ACKED_ID for DC1 and the second time, as
you'd sort of expected, updating the LAST_ACKED_ID for DC2 (correct as this
consumer was consuming and the selector matched). We also used the debugger
to inspect the value of bind params on the UPDATE statement each time to
confirm which DC1 was being updated in which order. We further used SQL
developer to identify when the change had occurred. Based on your earlier
comment, I guess we move to creating a JIRA which I'll now do. Never having
done this I assume I just bring the history of this forum topic to the JIRA
background. Let me know please if anything else to do on our end.
Cheers
Jason.
Here are the stack traces in the order they fired and were captured from the
debug session for the thread hitting the breakpoint:
1st BP:
Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35880@61616] (Suspended
(entry into method unmatched in DurableTopicSubscription))
owns: Topic (id=72)
DurableTopicSubscription.unmatched(MessageReference) line: 98
SimpleDispatchPolicy.dispatch(MessageReference,
MessageEvaluationContext,
List<Subscription>) line: 44
Topic.dispatch(ConnectionContext, Message) line: 769
Topic.doMessageSend(ProducerBrokerExchange, Message) line: 554
Topic.send(ProducerBrokerExchange, Message) line: 482
ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange, Message)
line: 503
ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message)
line: 468
ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 293
SchedulerBroker(BrokerFilter).send(ProducerBrokerExchange, Message)
line:
153
SchedulerBroker.send(ProducerBrokerExchange, Message) line: 195
AdvisoryBroker(BrokerFilter).send(ProducerBrokerExchange, Message) line:
153
CompositeDestinationBroker.send(ProducerBrokerExchange, Message) line:
96
TransactionBroker.send(ProducerBrokerExchange, Message) line: 293
RedeliveryPlugin(MutableBrokerFilter).send(ProducerBrokerExchange,
Message)
line: 158
LoggingBrokerPlugin(MutableBrokerFilter).send(ProducerBrokerExchange,
Message) line: 158
LoggingBrokerPlugin.send(ProducerBrokerExchange, Message) line: 275
BrokerService$6(MutableBrokerFilter).send(ProducerBrokerExchange,
Message)
line: 158
ManagedTransportConnection(TransportConnection).processMessage(Message)
line: 581
ActiveMQTextMessage(ActiveMQMessage).visit(CommandVisitor) line: 768
ManagedTransportConnection(TransportConnection).service(Command) line:
336
TransportConnection$1.onCommand(Object) line: 200
MutexTransport.onCommand(Object) line: 50
WireFormatNegotiator.onCommand(Object) line: 125
InactivityMonitor(AbstractInactivityMonitor).onCommand(Object) line:
301
TcpTransport(TransportSupport).doConsume(Object) line: 83
TcpTransport.doRun() line: 233
TcpTransport.run() line: 215
Thread.run() line: 745
2nd BP
Daemon Thread [ActiveMQ Transport: tcp:///127.0.0.1:35875@61616] (Suspended
(breakpoint at line 325 in DurableTopicSubscription))
owns: Object (id=130)
DurableTopicSubscription.acknowledge(ConnectionContext, MessageAck,
MessageReference) line: 325
DurableTopicSubscription(PrefetchSubscription).acknowledge(ConnectionContext,
MessageAck) line: 233
ManagedTopicRegion(AbstractRegion).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 526
ManagedRegionBroker(RegionBroker).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 484
SchedulerBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88
AdvisoryBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88
CompositeDestinationBroker(BrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 88
TransactionBroker.acknowledge(ConsumerBrokerExchange, MessageAck) line:
276
RedeliveryPlugin(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98
LoggingBrokerPlugin(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98
LoggingBrokerPlugin.acknowledge(ConsumerBrokerExchange, MessageAck)
line:
162
BrokerService$6(MutableBrokerFilter).acknowledge(ConsumerBrokerExchange,
MessageAck) line: 98
ManagedTransportConnection(TransportConnection).processMessageAck(MessageAck)
line: 590
MessageAck.visit(CommandVisitor) line: 245
ManagedTransportConnection(TransportConnection).service(Command) line:
336
TransportConnection$1.onCommand(Object) line: 200
MutexTransport.onCommand(Object) line: 50
WireFormatNegotiator.onCommand(Object) line: 125
InactivityMonitor(AbstractInactivityMonitor).onCommand(Object) line:
301
TcpTransport(TransportSupport).doConsume(Object) line: 83
TcpTransport.doRun() line: 233
TcpTransport.run() line: 215
Thread.run() line: 745
3rd BP:
Daemon Thread [Journal checkpoint worker] (Suspended)
OracleJDBCAdapter(DefaultJDBCAdapter).doSetLastAck(TransactionContext,
ActiveMQDestination, XATransactionId, String, String, long, long) line: 526
JDBCTopicMessageStore.acknowledge(ConnectionContext, String, String,
MessageId, MessageAck) line: 86
JournalTopicMessageStore$2.execute() line: 191
JournalMessageStore$3.execute() line: 306
TransactionTemplate.run(Callback) line: 44
JournalTopicMessageStore(JournalMessageStore).checkpoint(Callback) line:
262
JournalTopicMessageStore.checkpoint() line: 182
JournalPersistenceAdapter$5.call() line: 455
JournalPersistenceAdapter$5.call() line: 452
FutureTask<V>.run() line: 262
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1145
ThreadPoolExecutor$Worker.run() line: 615
Thread.run() line: 745
--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html