Jarek, good news, but that exception is ugly. It would be smashing if you could reproduce in a Junit tests case using just the JMS apis, but it may be difficult to reproduce.
I think I need to turn the queue test case from AMQ-2149 into a Topic scenario. That may reproduce. 2009/3/26 Jarosław Pałka <jpa...@gmail.com> > Gary, > > I did recommended changes. On the embedded broker side everything looks > fine > now. I restarted Jetty (embedded broker) many times and it worked. > > What is interesting I started to see some exceptions in my other > application > that is listening on the same ActiveMQ server. I think below schema will > help to explain my topology, > > client ---> topic://OUTGOING (embedded broker with JMS connector) --> > topic://OUT.subtopic (ActiveMQ server) --> Mule ESB > > When message is sent from client to MuleESB, after implementing your > changes > I started to see follwoing message: > > 19761 [ActiveMQ Session Task] DEBUG > org.mule.transport.jms.MultiConsumerJmsMessageReceiver - Message Received > from: jms://topic:OUT.* > 19761 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession - There is > no session id on the request using key: ID. Generating new session id: > 575fc82b-1a0f-11de-8ce7-c7a955038e40 > 19762 [ActiveMQ Session Task] DEBUG org.mule.DefaultMuleSession - > dispatching event to service: service.que-outgoing, event is: MuleEvent: > 575fc82c-1a0f-11de-8ce7-c7a955038e40, sync=false, stop processing=f > alse, DefaultInboundEndpoint{endpointUri=jms://topic:OUT.*, > connector=ActiveMQJmsConnector{this=ad97f5, started=true, initialised=true, > name='jmsConnector', disposed=false, numberOfConcurrentTransactedReceiv > ers=4, createMultipleTransactedReceivers=true, connected=true, > supportedProtocols=[jms], serviceOverrides=null}, > transformer=[JMSMessageToObject{this=170b819, name='JMSMessageToObject', > ignoreBadInput=false, > returnClass=class java.lang.Object, sourceTypes=[interface > javax.jms.Message, interface javax.jms.TextMessage, interface > javax.jms.ObjectMessage, interface javax.jms.BytesMessage, interface > javax.jms.MapMes > sage, interface javax.jms.StreamMessage]}, XML2UMMTransformer{this=146ad8b, > name='xml2umm', ignoreBadInput=false, returnClass=class java.lang.Object, > sourceTypes=[]}, ExtractHeadersTransformer{this=12b9f14, > name='extract-headers', ignoreBadInput=false, returnClass=class > java.lang.Object, sourceTypes=[]}], name='endpoint.jms.OUT', > properties={durableName=cozaciota}, > transactionConfig=Transaction{factory=null, ac > tion=NEVER, timeout=0}, filter=null, deleteUnacceptedMessages=false, > securityFilter=null, synchronous=false, initialState=started, > remoteSync=false, remoteSyncTimeout=10000, endpointEncoding=UTF-8} > 10459 [ActiveMQ Connection Worker: vm://localhost#2] DEBUG > org.apache.activemq.ActiveMQConnection - Async exception with no exception > listener: java.lang.NullPointerException > java.lang.NullPointerException > at > > org.apache.activemq.kaha.impl.index.VMIndexLinkedList.remove(VMIndexLinkedList.java:265) > at > > org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:314) > at > > org.apache.activemq.kaha.impl.container.ListContainerImpl.remove(ListContainerImpl.java:695) > at > > org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(KahaTopicReferenceStore.java:148) > at > > org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:163) > at > > org.apache.activemq.store.amq.AMQTopicMessageStore.acknowledge(AMQTopicMessageStore.java:91) > at > org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:447) > at > > org.apache.activemq.broker.region.DurableTopicSubscription.acknowledge(DurableTopicSubscription.java:223) > at > > org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:238) > at > > org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373) > at > > org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462) > at > > org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) > at > > org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) > at > > org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456) > at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) > at > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) > at > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) > at > > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104) > at > > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > at > org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:205) > at > > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122) > at > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) > at java.lang.Thread.run(Thread.java:619) > > Application is working correctly, the only thing is this exception in logs. > For testing purposes I'm running both client and Mule ESB under same JVM. > > Jarek > > > W dniu 26 marca 2009 14:26 użytkownik Gary Tully <gary.tu...@gmail.com > >napisał: > > > Hi Jarek, > > some similar behaviour is expressed by issue > > AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>. > > > > One aspect of the problem is related to the persistent index used by the > > Kaha store. You could try using a VM (in memory) index to see if it helps > > in > > your case. > > > > To configure set the setPersistentIndex attribute to false as follows: > > > > broker = .... > > AMQPersistenceAdapterFactory persistenceFactory = new > > AMQPersistenceAdapterFactory(); > > persistenceFactory.setDataDirectory(dataDirFile); > > persistenceFactory.setPersistentIndex(false); > > broker.setPersistenceFactory(persistenceFactory); > > > > hope this helps, > > Gary. > > > > 2009/3/26 Jarosław Pałka <jpa...@gmail.com> > > > > > Sorry I forgot to copy embedded broker configuration: > > > > > > public abstract class UMMClientConfiguration extends > > ConfigurationSupport{ > > > > > > private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING"; > > > private static final String INCOMING_LOCAL_TOPIC = "INCOMING"; > > > > > > @Bean > > > public BrokerService brokerService() throws Exception { > > > XBeanBrokerService broker = new XBeanBrokerService(); > > > broker.setUseJmx(false); > > > broker.setPersistent(true); > > > //broker. > > > broker.addJmsConnector(incomingTopicConnector()); > > > broker.addJmsConnector(outgoingTopicConnector()); > > > broker.addConnector("vm://localhost"); > > > broker.start(); > > > return broker; > > > } > > > > > > @Bean > > > public JmsTopicConnector outgoingTopicConnector() > > > throws UnknownHostException, URISyntaxException { > > > JmsTopicConnector topicConnector = new JmsTopicConnector(); > > > topicConnector > > > > > > .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory()); > > > topicConnector.setLocalClientId("local"); > > > topicConnector > > > .setOutboundTopicBridges(new OutboundTopicBridge[] { > > > outboundTopicBridge() }); > > > return topicConnector; > > > } > > > > > > @Bean > > > public TopicConnectionFactory outboundTopicConnectionFactory() > > > throws UnknownHostException, URISyntaxException { > > > ActiveMQConnectionFactory activeMQConnectionFactory = new > > > ActiveMQConnectionFactory( > > > getAMQBrokerURL()); > > > activeMQConnectionFactory.setClientID(getClientID()); > > > return activeMQConnectionFactory; > > > } > > > > > > @Bean > > > public OutboundTopicBridge outboundTopicBridge() throws > > > URISyntaxException { > > > OutboundTopicBridge topicBridge = new OutboundTopicBridge(); > > > topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC); > > > topicBridge.setOutboundTopicName(getOutboundTopicName()); > > > topicBridge.setConsumerName(getConsumerName()); > > > return topicBridge; > > > } > > > > > > @Bean > > > public JmsTemplate jmsTemplate() throws UnknownHostException, > > > URISyntaxException { > > > JmsTemplate jmsTemplate = new JmsTemplate( > > > localOutgoingTopicConnectionFactory()); > > > jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC); > > > jmsTemplate.setDeliveryPersistent(true); > > > jmsTemplate.setPubSubDomain(true); > > > return jmsTemplate; > > > } > > > > > > @Bean(dependsOn = "brokerService") > > > public TopicConnectionFactory localOutgoingTopicConnectionFactory() > > > throws UnknownHostException, URISyntaxException { > > > ActiveMQConnectionFactory activeMQConnectionFactory = new > > > ActiveMQConnectionFactory( > > > "vm://localhost"); > > > > activeMQConnectionFactory.setClientID(getLocalOutgoingClientID()); > > > return activeMQConnectionFactory; > > > } > > > } > > > > > > Regards, > > > Jarek > > > > > > W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <jpa...@gmail.com > > > >napisał: > > > > > > > Hi, > > > > > > > > I have strange behavior of embedded ActiveMQ 5.2 broker. > > > > > > > > I have an web application running under Jetty 1.6.14 (with Spring > > 2.5.6). > > > I > > > > have embedded broker that is connected to ActiveMQ server through JMS > > > > connector.I have to topics INCOMING and OUTGOING that retrieve and > > > forward > > > > messages to ActiveMQ server. > > > > I use XBeanBrokerService to configure embedded broker. The > > configuration > > > of > > > > embedded broker is in attachment. > > > > > > > > Everything runs great until restart of Jetty, once restarted web > > > > application is able to receive messages but it doesn't send messages. > I > > > got > > > > following exception logged in ActiveMQ server: > > > > > > > > ERROR RecoveryListenerAdapter - Message id > > > > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered from > > the > > > > data store - already dispatched > > > > ERROR Service - Async error occurred: > > > > javax.jms.JMSException: Could not correlate acknowledgment with > > > dispatched > > > > message: MessageAck {commandId = 8, responseRequired = false, ackType > = > > > 0, > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId = > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId = > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = > topic://IN.ltbl, > > > > transactionId = null, messageCount = 0} > > > > javax.jms.JMSException: Could not correlate acknowledgment with > > > dispatched > > > > message: MessageAck {commandId = 8, responseRequired = false, ackType > = > > > 0, > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId = > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId = > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = > topic://IN.ltbl, > > > > transactionId = null, messageCount = 0} > > > > at > > > > > > > > > > org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304) > > > > at > > > > > > > > > > org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373) > > > > at > > > > > > > > > > org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462) > > > > at > > > > > > > > > > org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) > > > > at > > > > > > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) > > > > at > > > > > > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) > > > > at > > > > > > > > > > org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) > > > > at > > > > > > > > > > org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456) > > > > at > > > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) > > > > at > > > > > > > > > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) > > > > at > > > > > > > > > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) > > > > at > > > > > > > > > > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > > > > at > > > > > > > > > > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) > > > > at > > > > > > > > > > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) > > > > at > > > > > > > > > > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) > > > > at > > > > > > > > > > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) > > > > at > > > > > > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) > > > > at java.lang.Thread.run(Thread.java:619) > > > > > > > > and similar message at web application side during shutdown: > > > > > > > > 2009-03-26 09:23:10.975::INFO: Shutdown hook executing > > > > 2009-03-26 09:23:10.975::INFO: Graceful shutdown > > > > selectchannelconnec...@0.0.0.0:8981 > > > > 140421 [ActiveMQ ShutdownHook] INFO > > > > org.apache.activemq.broker.BrokerService - ActiveMQ Message Broker > > > > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down > > > > 140421 [ActiveMQ ShutdownHook] DEBUG > > > > org.apache.activemq.broker.BrokerService - Caught exception, must be > > > > shutting down: java.lang.IllegalStateException: Shutdown in progress > > > > 2009-03-26 09:23:10.987::INFO: Graceful shutdown > > > > org.mortbay.jetty.webapp.webappcont...@173831b > > > > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d} > > > > 2009-03-26 09:23:10.987::INFO: Graceful shutdown > > > > org.mortbay.jetty.webapp.webappcont...@1abab88 > > > > {/,/home/jpalka/jetty-6.1.14/webapps/test} > > > > 2009-03-26 09:23:10.987::INFO: Graceful shutdown > > > > org.mortbay.jetty.handler.contexthand...@b1b4c3 > > > > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/} > > > > 2009-03-26 09:23:10.987::INFO: Graceful shutdown > > > > org.mortbay.jetty.webapp.webappcont...@5e5f92 > > > > > > > > > > {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/} > > > > 2009-03-26 09:23:10.987::INFO: Graceful shutdown > > > > org.mortbay.jetty.webapp.webappcont...@1d4ab0e > > > > > {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/} > > > > 2009-03-26 09:23:10.987::INFO: Graceful shutdown > > > > org.mortbay.jetty.webapp.webappcont...@12a585c > > > > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/} > > > > 2009-03-26 09:23:10.987::INFO: Graceful shutdown > > > > org.mortbay.jetty.webapp.webappcont...@10f3801 > > > > > > {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/} > > > > 2009-03-26 09:23:10.987::INFO: Graceful shutdown > > > > org.mortbay.jetty.webapp.webappcont...@2606b8 > > > > {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/} > > > > 140444 [ActiveMQ ShutdownHook] INFO > > > > org.apache.activemq.network.jms.JmsConnector - JMS Connector > > Connector:0 > > > > Stopped > > > > 140447 [ActiveMQ ShutdownHook] INFO > > > > org.apache.activemq.network.jms.JmsConnector - JMS Connector > > Connector:1 > > > > Stopped > > > > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG > > > > org.apache.activemq.broker.TransportConnection - Stopping > connection: > > > > vm://localhost#0 > > > > 140452 [VMTransport] DEBUG > > org.apache.activemq.store.amq.AMQMessageStore > > > - > > > > flush starting ... > > > > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG > > > > org.apache.activemq.ActiveMQConnection - Async exception with no > > > exception > > > > listener: javax.jms.JMSException: Could not correlate acknowledgment > > with > > > > dispatched message: MessageAck {commandId = 8, responseRequired = > > false, > > > > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, > > > > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1, > lastMessageId > > = > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = > topic://IN.ltbl, > > > > transactionId = null, messageCount = 0} > > > > javax.jms.JMSException: Could not correlate acknowledgment with > > > dispatched > > > > message: MessageAck {commandId = 8, responseRequired = false, ackType > = > > > 0, > > > > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId = > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId = > > > > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = > topic://IN.ltbl, > > > > transactionId = null, messageCount = 0} > > > > at > > > > > > > > > > org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304) > > > > at > > > > > > > > > > org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373) > > > > at > > > > > > > > > > org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462) > > > > at > > > > > > > > > > org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) > > > > at > > > > > > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) > > > > at > > > > > > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) > > > > at > > > > > > > > > > org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) > > > > at > > > > > > > > > > org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456) > > > > at > > > > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) > > > > at > > > > > > > > > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) > > > > at > > > > > > > > > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) > > > > at > > > > > > > > > > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > > > > at > > > > > > > > > > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) > > > > at > > > > > > > > > > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) > > > > at > > > > > > > > > > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) > > > > at > > > > > > > > > > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) > > > > at > > > > > > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) > > > > at java.lang.Thread.run(Thread.java:619) > > > > 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection - > > > Async > > > > exception with no exception listener: > > > > org.apache.activemq.transport.TransportDisposedIOException: Peer > > > > (vm://localhost#1) disposed. > > > > org.apache.activemq.transport.TransportDisposedIOException: Peer > > > > (vm://localhost#1) disposed. > > > > at > > > > > > > > > > org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203) > > > > at > > > > > > > > > > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122) > > > > at > > > > > > > > > > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43) > > > > at > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885) > > > > at > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) > > > > at java.lang.Thread.run(Thread.java:619) > > > > > > > > > > > > The only workaround I found is to remove activemq-data directory at > web > > > > application site before starting Jetty again. > > > > > > > > Can you tell me what I'm doing wrong? I have not seen such behavior > > with > > > > ActiveMQ 5.1. > > > > > > > > Regards, > > > > Jarek > > > > > > > > > > > > > > > > > > > > > > > -- > > http://blog.garytully.com > > > > Open Source SOA > > http://FUSESource.com > > > -- http://blog.garytully.com Open Source SOA http://FUSESource.com