I might be able to help a little. Here is a blog I am working on that has a Message Driven POJO through Spring. http://www.baselogic.com/blog/java/testing-activemq-virtualtopics-using-camel-and-junit
I have a unit test that starts an embedded AMQ broker, then starts a jms listener for the MDP: <jms:listener-container client-id="jmsContainer1" transaction-manager="transactionManager"> <jms:listener id="jmsListener1" destination="Consumer.1.VirtualTopic.Table.1" ref="testClient1" method="onMessage" /> </jms:listener-container> Then I use Mocks to test I get the messages delivered. Because as i look at your stack, it seems that the broker might not be started, and/or your MDB has not successfully connected to your destination. Maybe Spring can be an easier route? --- Thank You… Mick Knutson, President BASE Logic, Inc. Enterprise Architecture, Design, Mentoring & Agile Consulting p. (866) BLiNC-411: (254-6241-1) f. (415) 685-4233 Website: http://baselogic.com Linked IN: http://linkedin.com/in/mickknutson Vacation Rental: http://tahoe.baselogic.com --- On Mon, Sep 7, 2009 at 12:08 AM, Neo Wang <wangdong...@hotmail.com> wrote: > > Environment: JBoss 5.1GA, Active MQ 5.2 embeded broker configuration > > I want to send a message to a topic when receiving one message in a MDB, > but > it can't be successful all the time, any one can help me? I have struggled > in it about 2 weeks. > > It is my sample code: > > package com.trading.platform.ejb; > > import javax.ejb.MessageDrivenBean; > import javax.ejb.MessageDrivenContext; > import javax.jms.Message; > import javax.jms.MessageListener; > import javax.jms.Queue; > import javax.jms.QueueConnectionFactory; > import javax.jms.TextMessage; > import javax.jms.Topic; > import javax.jms.TopicConnection; > import javax.jms.TopicConnectionFactory; > import javax.jms.TopicPublisher; > import javax.jms.TopicSession; > import javax.naming.Context; > import javax.naming.InitialContext; > import javax.naming.NamingException; > > import com.trading.platform.context.ServerContext; > import com.trading.platform.exception.InvokeException; > import com.trading.platform.exception.SystemException; > import com.trading.platform.handler.MessageHandler; > import com.trading.platform.log.ILogger; > import com.trading.platform.log.Logger; > > public class OrderResponseMDB implements MessageDrivenBean, > MessageListener{ > private ILogger logger = Logger.getLogger(this.getClass()); > private MessageDrivenContext messageDrivenContext; > > private volatile TopicConnectionFactory topicConnnectionFactory; > private volatile Topic topic; > > private void getInitialContext(){ > try { > Context context = new InitialContext(); > topicConnnectionFactory = > > (TopicConnectionFactory)context.lookup("java:comp/env/jms/TopicConnectionFactory"); > //topic = > (Topic)context.lookup("java:comp/env/jms/OrderBroadcastTopic"); > if(logger.isDebugEnabled()) > logger.debug("Platform:Publisher Topic > Session connnection is created..." ); > } catch (NamingException e) { > if(logger.isErrorEnabled()) > logger.error("NamingExcetion:" + e); > throw new SystemException("System Error", e); > } > } > > public void onMessage(Message inMessage) { > > TopicConnection topicConn = null; > try { > if (inMessage instanceof TextMessage) { > TextMessage txtmsg = (TextMessage)inMessage; > > System.out.println("MESSAGE BEAN3: Message received: > index="+txtmsg.getText()); > topicConn = > topicConnnectionFactory.createTopicConnection(); > topicConn.start(); > TopicSession topicSession = > topicConn.createTopicSession(false, > TopicSession.AUTO_ACKNOWLEDGE); > topic = > topicSession.createTopic("topic.orderbroadcast"); > > TopicPublisher publisher = > topicSession.createPublisher(topic); > TextMessage newMsg = > topicSession.createTextMessage("11111"); > publisher.send(newMsg); > > //publisher.close(); > //topicSession.close(); > topicConn.close(); > } else { > System.out.println("WRONG MESSAGE TYPE received: " + > inMessage.getClass().getName()); > } > } catch (Throwable t) { > System.out.println("error..."); > t.printStackTrace(); > messageDrivenContext.setRollbackOnly(); > > try { > if (topicConn != null) topicConn.close(); > System.out.println("MESSAGE BEAN: Rollback: > index="+((TextMessage)inMessage).getText()); > } catch (Exception e) { > System.out.println("MESSAGE BEAN: Unable to get > index > property because:"+e.getMessage()); > e.printStackTrace(); > > } > throw new RuntimeException(t.getMessage()); > } > } > > > public void ejbCreate(){ > System.out.println("ConnectionFactory created..."); > getInitialContext(); > } > > public void ejbRemove(){ > > } > > public void setMessageDrivenContext(MessageDrivenContext > messageDrivenContext){ > this.messageDrivenContext = messageDrivenContext; > } > } > > The following is the exception information: > > 14:59:09,601 WARN [Service] Async error occurred: > javax.transaction.xa.XAException: Transaction > > 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' > has not been started. > javax.transaction.xa.XAException: Transaction > > 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' > has not been started. > at > > org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266) > at > > org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208) > at > org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126) > at > > org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95) > at > > org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133) > at > > org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455) > at > org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639) > at > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308) > at > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182) > at > > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > at > > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) > at > > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210) > at > > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) > at java.lang.Thread.run(Thread.java:595) > 14:59:09,614 WARN [ActiveMQManagedConnection] Connection failed: > javax.jms.JMSException: Transaction > > 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' > has not been started. > 14:59:09,615 WARN [TxConnectionManager] Connection error occured: > > org.jboss.resource.connectionmanager.txconnectionmanager$txconnectioneventliste...@18936ae > [state=NORMAL > mc=org.apache.activemq.ra.activemqmanagedconnect...@1365339 handles=0 > lastUse=1252306749595 permit=false trackByTx=false > > mcp=org.jboss.resource.connectionmanager.jbossmanagedconnectionpool$onep...@191c3ce > > context=org.jboss.resource.connectionmanager.internalmanagedconnectionp...@ce1472 > xaresource=org.apache.activemq.ra.activemqmanagedconnectio...@164e48d > txSync=null] > javax.jms.JMSException: Transaction > > 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' > has not been started. > at > > org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49) > at > > org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1784) > at > > org.apache.activemq.ActiveMQConnection$2$1.run(ActiveMQConnection.java:1705) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675) > at java.lang.Thread.run(Thread.java:595) > Caused by: javax.transaction.xa.XAException: Transaction > > 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663' > has not been started. > at > > org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266) > at > > org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208) > at > org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126) > at > > org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95) > at > > org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133) > at > > org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455) > at > org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639) > at > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308) > at > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182) > at > > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > at > > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) > at > > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210) > at > > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) > ... 1 more > > > > -- > View this message in context: > http://www.nabble.com/How-to-send-one-message-to-a-topic-in-MDB--tp25325925p25325925.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > >