Hi Jarek,
some similar behaviour is expressed by issue
AMQ-2149<https://issues.apache.org/activemq/browse/AMQ-2149>.

One aspect of the problem is related to the persistent index used by the
Kaha store. You could try using a VM (in memory) index to see if it helps in
your case.

To configure set the setPersistentIndex attribute to false as follows:

        broker = ....
        AMQPersistenceAdapterFactory persistenceFactory = new
AMQPersistenceAdapterFactory();
        persistenceFactory.setDataDirectory(dataDirFile);
        persistenceFactory.setPersistentIndex(false);
        broker.setPersistenceFactory(persistenceFactory);

hope this helps,
Gary.

2009/3/26 Jarosław Pałka <jpa...@gmail.com>

> Sorry I forgot to copy embedded broker configuration:
>
> public abstract class UMMClientConfiguration extends ConfigurationSupport{
>
>    private static final String OUTGOING_LOCAL_TOPIC = "OUTGOING";
>    private static final String INCOMING_LOCAL_TOPIC = "INCOMING";
>
>    @Bean
>    public BrokerService brokerService() throws Exception {
>        XBeanBrokerService broker = new XBeanBrokerService();
>        broker.setUseJmx(false);
>        broker.setPersistent(true);
>        //broker.
>        broker.addJmsConnector(incomingTopicConnector());
>        broker.addJmsConnector(outgoingTopicConnector());
>        broker.addConnector("vm://localhost");
>        broker.start();
>        return broker;
>    }
>
>     @Bean
>    public JmsTopicConnector outgoingTopicConnector()
>            throws UnknownHostException, URISyntaxException {
>        JmsTopicConnector topicConnector = new JmsTopicConnector();
>        topicConnector
>
> .setOutboundTopicConnectionFactory(outboundTopicConnectionFactory());
>        topicConnector.setLocalClientId("local");
>        topicConnector
>                .setOutboundTopicBridges(new OutboundTopicBridge[] {
> outboundTopicBridge() });
>        return topicConnector;
>    }
>
>    @Bean
>    public TopicConnectionFactory outboundTopicConnectionFactory()
>            throws UnknownHostException, URISyntaxException {
>        ActiveMQConnectionFactory activeMQConnectionFactory = new
> ActiveMQConnectionFactory(
>                getAMQBrokerURL());
>        activeMQConnectionFactory.setClientID(getClientID());
>        return activeMQConnectionFactory;
>    }
>
>    @Bean
>    public OutboundTopicBridge outboundTopicBridge() throws
> URISyntaxException {
>        OutboundTopicBridge topicBridge = new OutboundTopicBridge();
>        topicBridge.setLocalTopicName(OUTGOING_LOCAL_TOPIC);
>        topicBridge.setOutboundTopicName(getOutboundTopicName());
>        topicBridge.setConsumerName(getConsumerName());
>        return topicBridge;
>    }
>
>    @Bean
>    public JmsTemplate jmsTemplate() throws UnknownHostException,
>            URISyntaxException {
>        JmsTemplate jmsTemplate = new JmsTemplate(
>                localOutgoingTopicConnectionFactory());
>        jmsTemplate.setDefaultDestinationName(OUTGOING_LOCAL_TOPIC);
>        jmsTemplate.setDeliveryPersistent(true);
>        jmsTemplate.setPubSubDomain(true);
>        return jmsTemplate;
>    }
>
>      @Bean(dependsOn = "brokerService")
>    public TopicConnectionFactory localOutgoingTopicConnectionFactory()
>            throws UnknownHostException, URISyntaxException {
>        ActiveMQConnectionFactory activeMQConnectionFactory = new
> ActiveMQConnectionFactory(
>                "vm://localhost");
>        activeMQConnectionFactory.setClientID(getLocalOutgoingClientID());
>        return activeMQConnectionFactory;
>    }
> }
>
> Regards,
> Jarek
>
> W dniu 26 marca 2009 10:01 użytkownik Jarosław Pałka <jpa...@gmail.com
> >napisał:
>
> > Hi,
> >
> > I have strange behavior of embedded ActiveMQ 5.2 broker.
> >
> > I have an web application running under Jetty 1.6.14 (with Spring 2.5.6).
> I
> > have embedded broker that is connected to ActiveMQ server through JMS
> > connector.I have to topics INCOMING and OUTGOING that retrieve and
> forward
> > messages to ActiveMQ server.
> > I use XBeanBrokerService to configure embedded broker. The configuration
> of
> > embedded broker is in attachment.
> >
> > Everything runs great until restart of Jetty, once restarted web
> > application is able to receive messages but it doesn't send messages. I
> got
> > following exception logged in ActiveMQ server:
> >
> > ERROR RecoveryListenerAdapter        - Message id
> > ID:ubot2-48486-1237589773318-0:0:2291:1:1 could not be recovered from the
> > data store - already dispatched
> > ERROR Service                        - Async error occurred:
> > javax.jms.JMSException: Could not correlate acknowledgment with
> dispatched
> > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> 0,
> > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > transactionId = null, messageCount = 0}
> > javax.jms.JMSException: Could not correlate acknowledgment with
> dispatched
> > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> 0,
> > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > transactionId = null, messageCount = 0}
> >         at
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> >         at
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> >         at
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> >         at
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> >         at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >         at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >         at
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> >         at
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> >         at
> > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> >         at
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> >         at
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> >         at
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> >         at
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> >         at
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> >         at
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> >         at
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> >         at
> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> >         at java.lang.Thread.run(Thread.java:619)
> >
> > and similar message at web application side during shutdown:
> >
> > 2009-03-26 09:23:10.975::INFO:  Shutdown hook executing
> > 2009-03-26 09:23:10.975::INFO:  Graceful shutdown
> > selectchannelconnec...@0.0.0.0:8981
> > 140421 [ActiveMQ ShutdownHook] INFO
> > org.apache.activemq.broker.BrokerService  - ActiveMQ Message Broker
> > (localhost, ID:ubot2-45498-1238055651187-0:0) is shutting down
> > 140421 [ActiveMQ ShutdownHook] DEBUG
> > org.apache.activemq.broker.BrokerService  - Caught exception, must be
> > shutting down: java.lang.IllegalStateException: Shutdown in progress
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.webappcont...@173831b
> > {/test-jndi,/home/jpalka/jetty-6.1.14/contexts/test-jndi.d}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.webappcont...@1abab88
> > {/,/home/jpalka/jetty-6.1.14/webapps/test}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.handler.contexthand...@b1b4c3
> > {/javadoc,file:/home/jpalka/jetty-6.1.14/javadoc/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.webappcont...@5e5f92
> >
> {/umm-gateway,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-gateway.war!/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.webappcont...@1d4ab0e
> > {/lottoblog,jar:file:/home/jpalka/jetty-6.1.14/webapps/lottoblog.war!/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.webappcont...@12a585c
> > {/test-jaas,file:/home/jpalka/jetty-6.1.14/webapps/test-jaas/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.webappcont...@10f3801
> > {/umm-portal,jar:file:/home/jpalka/jetty-6.1.14/webapps/umm-portal.war!/}
> > 2009-03-26 09:23:10.987::INFO:  Graceful shutdown
> > org.mortbay.jetty.webapp.webappcont...@2606b8
> > {/cometd,jar:file:/home/jpalka/jetty-6.1.14/webapps/cometd.war!/}
> > 140444 [ActiveMQ ShutdownHook] INFO
> > org.apache.activemq.network.jms.JmsConnector  - JMS Connector Connector:0
> > Stopped
> > 140447 [ActiveMQ ShutdownHook] INFO
> > org.apache.activemq.network.jms.JmsConnector  - JMS Connector Connector:1
> > Stopped
> > 140449 [ActiveMQ Transport Stopper: vm://localhost#0] DEBUG
> > org.apache.activemq.broker.TransportConnection  - Stopping connection:
> > vm://localhost#0
> > 140452 [VMTransport] DEBUG org.apache.activemq.store.amq.AMQMessageStore
>  -
> > flush starting ...
> > 140450 [ActiveMQ Connection Worker: tcp://localhost:61616] DEBUG
> > org.apache.activemq.ActiveMQConnection  - Async exception with no
> exception
> > listener: javax.jms.JMSException: Could not correlate acknowledgment with
> > dispatched message: MessageAck {commandId = 8, responseRequired = false,
> > ackType = 0, consumerId = ID:ubot2-45498-1238055651187-2:0:2:1,
> > firstMessageId = ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > transactionId = null, messageCount = 0}
> > javax.jms.JMSException: Could not correlate acknowledgment with
> dispatched
> > message: MessageAck {commandId = 8, responseRequired = false, ackType =
> 0,
> > consumerId = ID:ubot2-45498-1238055651187-2:0:2:1, firstMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, lastMessageId =
> > ID:ubot2-52020-1238055648089-0:0:2:1:1, destination = topic://IN.ltbl,
> > transactionId = null, messageCount = 0}
> >         at
> >
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:304)
> >         at
> >
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:373)
> >         at
> >
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
> >         at
> >
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> >         at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >         at
> > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
> >         at
> >
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
> >         at
> >
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
> >         at
> > org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
> >         at
> >
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
> >         at
> >
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
> >         at
> >
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> >         at
> >
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> >         at
> >
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> >         at
> >
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> >         at
> >
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> >         at
> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> >         at java.lang.Thread.run(Thread.java:619)
> > 140465 [VMTransport] DEBUG org.apache.activemq.ActiveMQConnection  -
> Async
> > exception with no exception listener:
> > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > (vm://localhost#1) disposed.
> > org.apache.activemq.transport.TransportDisposedIOException: Peer
> > (vm://localhost#1) disposed.
> >         at
> >
> org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:203)
> >         at
> >
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> >         at
> >
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> >         at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
> >         at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
> >         at java.lang.Thread.run(Thread.java:619)
> >
> >
> > The only workaround I found is to remove activemq-data directory at web
> > application site before starting Jetty again.
> >
> > Can you tell me what I'm doing wrong? I have not seen such behavior with
> > ActiveMQ 5.1.
> >
> > Regards,
> > Jarek
> >
> >
> >
>



-- 
http://blog.garytully.com

Open Source SOA
http://FUSESource.com

Reply via email to