I might be able to help a little.
Here is a blog I am working on that has a Message Driven POJO through
Spring.
http://www.baselogic.com/blog/java/testing-activemq-virtualtopics-using-camel-and-junit

I have a unit test that starts an embedded AMQ broker, then starts a jms
listener for the MDP:

<jms:listener-container client-id="jmsContainer1"
transaction-manager="transactionManager">

                <jms:listener id="jmsListener1"
                              destination="Consumer.1.VirtualTopic.Table.1"
                                  ref="testClient1"
                                  method="onMessage" />
        </jms:listener-container>

Then I use Mocks to test I get the messages delivered.

Because as i look at your stack, it seems that the broker might not be
started, and/or your MDB has not successfully connected to your destination.
Maybe Spring can be an easier route?

---
Thank You…

Mick Knutson, President

BASE Logic, Inc.
Enterprise Architecture, Design, Mentoring & Agile Consulting
p. (866) BLiNC-411: (254-6241-1)
f. (415) 685-4233

Website: http://baselogic.com
Linked IN: http://linkedin.com/in/mickknutson
Vacation Rental: http://tahoe.baselogic.com
---



On Mon, Sep 7, 2009 at 12:08 AM, Neo Wang <wangdong...@hotmail.com> wrote:

>
> Environment: JBoss 5.1GA, Active MQ 5.2 embeded broker configuration
>
> I want to send a message to a topic when receiving one message in a MDB,
> but
> it can't be successful all the time, any one can help me? I have struggled
> in it about 2 weeks.
>
> It is my sample code:
>
> package com.trading.platform.ejb;
>
> import javax.ejb.MessageDrivenBean;
> import javax.ejb.MessageDrivenContext;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.Queue;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.TextMessage;
> import javax.jms.Topic;
> import javax.jms.TopicConnection;
> import javax.jms.TopicConnectionFactory;
> import javax.jms.TopicPublisher;
> import javax.jms.TopicSession;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> import com.trading.platform.context.ServerContext;
> import com.trading.platform.exception.InvokeException;
> import com.trading.platform.exception.SystemException;
> import com.trading.platform.handler.MessageHandler;
> import com.trading.platform.log.ILogger;
> import com.trading.platform.log.Logger;
>
> public class OrderResponseMDB implements MessageDrivenBean,
> MessageListener{
>        private ILogger logger = Logger.getLogger(this.getClass());
>        private MessageDrivenContext messageDrivenContext;
>
>        private volatile TopicConnectionFactory topicConnnectionFactory;
>        private volatile Topic topic;
>
>        private void getInitialContext(){
>                try {
>                        Context context = new InitialContext();
>                        topicConnnectionFactory =
>
> (TopicConnectionFactory)context.lookup("java:comp/env/jms/TopicConnectionFactory");
>                        //topic =
> (Topic)context.lookup("java:comp/env/jms/OrderBroadcastTopic");
>                        if(logger.isDebugEnabled())
> logger.debug("Platform:Publisher Topic
> Session connnection is created..." );
>                } catch (NamingException e) {
>                        if(logger.isErrorEnabled())
> logger.error("NamingExcetion:" + e);
>                        throw new SystemException("System Error", e);
>                }
>        }
>
>         public void onMessage(Message inMessage) {
>
>                 TopicConnection topicConn = null;
>                try {
>                    if (inMessage instanceof TextMessage) {
>                        TextMessage txtmsg = (TextMessage)inMessage;
>
>                        System.out.println("MESSAGE BEAN3: Message received:
> index="+txtmsg.getText());
>                        topicConn =
> topicConnnectionFactory.createTopicConnection();
>                                topicConn.start();
>                                TopicSession topicSession =
> topicConn.createTopicSession(false,
> TopicSession.AUTO_ACKNOWLEDGE);
>                                topic =
> topicSession.createTopic("topic.orderbroadcast");
>
>                                TopicPublisher publisher =
> topicSession.createPublisher(topic);
>                                TextMessage newMsg =
> topicSession.createTextMessage("11111");
>                                publisher.send(newMsg);
>
>                                //publisher.close();
>                                //topicSession.close();
>                                topicConn.close();
>                    } else {
>                        System.out.println("WRONG MESSAGE TYPE received: " +
> inMessage.getClass().getName());
>                    }
>                } catch (Throwable t) {
>                        System.out.println("error...");
>                    t.printStackTrace();
>                    messageDrivenContext.setRollbackOnly();
>
>                    try {
>                        if (topicConn != null)  topicConn.close();
>                        System.out.println("MESSAGE BEAN: Rollback:
> index="+((TextMessage)inMessage).getText());
>                    } catch (Exception e) {
>                        System.out.println("MESSAGE BEAN: Unable to get
> index
> property because:"+e.getMessage());
>                        e.printStackTrace();
>
>                    }
>                    throw new RuntimeException(t.getMessage());
>                }
>         }
>
>
>        public void ejbCreate(){
>                System.out.println("ConnectionFactory created...");
>                getInitialContext();
>        }
>
>        public void ejbRemove(){
>
>        }
>
>        public void setMessageDrivenContext(MessageDrivenContext
> messageDrivenContext){
>                this.messageDrivenContext = messageDrivenContext;
>        }
> }
>
> The following is the exception information:
>
> 14:59:09,601 WARN  [Service] Async error occurred:
> javax.transaction.xa.XAException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
> javax.transaction.xa.XAException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
>        at
>
> org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
>        at
>
> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
>        at
> org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
>        at
>
> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
>        at
>
> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
>        at
>
> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
>        at
> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
>        at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
>        at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
>        at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>        at
>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>        at
>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
>        at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>        at java.lang.Thread.run(Thread.java:595)
> 14:59:09,614 WARN  [ActiveMQManagedConnection] Connection failed:
> javax.jms.JMSException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
> 14:59:09,615 WARN  [TxConnectionManager] Connection error occured:
>
> org.jboss.resource.connectionmanager.txconnectionmanager$txconnectioneventliste...@18936ae
> [state=NORMAL
> mc=org.apache.activemq.ra.activemqmanagedconnect...@1365339 handles=0
> lastUse=1252306749595 permit=false trackByTx=false
>
> mcp=org.jboss.resource.connectionmanager.jbossmanagedconnectionpool$onep...@191c3ce
>
> context=org.jboss.resource.connectionmanager.internalmanagedconnectionp...@ce1472
> xaresource=org.apache.activemq.ra.activemqmanagedconnectio...@164e48d
> txSync=null]
> javax.jms.JMSException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
>        at
>
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
>        at
>
> org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1784)
>        at
>
> org.apache.activemq.ActiveMQConnection$2$1.run(ActiveMQConnection.java:1705)
>        at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>        at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>        at java.lang.Thread.run(Thread.java:595)
> Caused by: javax.transaction.xa.XAException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
>        at
>
> org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
>        at
>
> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
>        at
> org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
>        at
>
> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
>        at
>
> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
>        at
>
> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
>        at
> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
>        at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
>        at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
>        at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>        at
>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
>        at
>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
>        at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>        ... 1 more
>
>
>
> --
> View this message in context:
> http://www.nabble.com/How-to-send-one-message-to-a-topic-in-MDB--tp25325925p25325925.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>

Reply via email to