Environment: JBoss 5.1GA, Active MQ 5.2 embeded broker configuration

I want to send a message to a topic when receiving one message in a MDB, but
it can't be successful all the time, any one can help me? I have struggled
in it about 2 weeks.

It is my sample code:

package com.trading.platform.ejb;

import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import com.trading.platform.context.ServerContext;
import com.trading.platform.exception.InvokeException;
import com.trading.platform.exception.SystemException;
import com.trading.platform.handler.MessageHandler;
import com.trading.platform.log.ILogger;
import com.trading.platform.log.Logger;

public class OrderResponseMDB implements MessageDrivenBean, MessageListener{
        private ILogger logger = Logger.getLogger(this.getClass());
        private MessageDrivenContext messageDrivenContext;
        
        private volatile TopicConnectionFactory topicConnnectionFactory;
        private volatile Topic topic;
        
        private void getInitialContext(){
                try {
                        Context context = new InitialContext();
                        topicConnnectionFactory =
(TopicConnectionFactory)context.lookup("java:comp/env/jms/TopicConnectionFactory");
                        //topic = 
(Topic)context.lookup("java:comp/env/jms/OrderBroadcastTopic");
                        if(logger.isDebugEnabled()) 
logger.debug("Platform:Publisher Topic
Session connnection is created..." );
                } catch (NamingException e) {
                        if(logger.isErrorEnabled()) 
logger.error("NamingExcetion:" + e);
                        throw new SystemException("System Error", e);
                }
        }

         public void onMessage(Message inMessage) {

                 TopicConnection topicConn = null;
                try {
                    if (inMessage instanceof TextMessage) {
                        TextMessage txtmsg = (TextMessage)inMessage;       
                       
                        System.out.println("MESSAGE BEAN3: Message received:
index="+txtmsg.getText());
                        topicConn =
topicConnnectionFactory.createTopicConnection();
                                topicConn.start();
                                TopicSession topicSession = 
topicConn.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
                                topic = 
topicSession.createTopic("topic.orderbroadcast");
                                
                                TopicPublisher publisher = 
topicSession.createPublisher(topic);
                                TextMessage newMsg = 
topicSession.createTextMessage("11111");
                                publisher.send(newMsg);
                                
                                //publisher.close();
                                //topicSession.close();
                                topicConn.close();
                    } else {
                        System.out.println("WRONG MESSAGE TYPE received: " +
inMessage.getClass().getName());
                    }
                } catch (Throwable t) {
                        System.out.println("error...");
                    t.printStackTrace();
                    messageDrivenContext.setRollbackOnly();
                    
                    try {
                        if (topicConn != null)  topicConn.close();
                        System.out.println("MESSAGE BEAN: Rollback:
index="+((TextMessage)inMessage).getText());
                    } catch (Exception e) {
                        System.out.println("MESSAGE BEAN: Unable to get index
property because:"+e.getMessage());
                        e.printStackTrace();
                        
                    }
                    throw new RuntimeException(t.getMessage());
                }
         }

        
        public void ejbCreate(){
                System.out.println("ConnectionFactory created...");
                getInitialContext();
        }
        
        public void ejbRemove(){
                
        }
        
        public void setMessageDrivenContext(MessageDrivenContext
messageDrivenContext){
                this.messageDrivenContext = messageDrivenContext;
        }
}

The following is the exception information:

14:59:09,601 WARN  [Service] Async error occurred:
javax.transaction.xa.XAException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
javax.transaction.xa.XAException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
        at
org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
        at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
        at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
        at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
        at
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
        at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        at java.lang.Thread.run(Thread.java:595)
14:59:09,614 WARN  [ActiveMQManagedConnection] Connection failed:
javax.jms.JMSException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
14:59:09,615 WARN  [TxConnectionManager] Connection error occured:
org.jboss.resource.connectionmanager.txconnectionmanager$txconnectioneventliste...@18936ae[state=normal
mc=org.apache.activemq.ra.activemqmanagedconnect...@1365339 handles=0
lastUse=1252306749595 permit=false trackByTx=false
mcp=org.jboss.resource.connectionmanager.jbossmanagedconnectionpool$onep...@191c3ce
context=org.jboss.resource.connectionmanager.internalmanagedconnectionp...@ce1472
xaresource=org.apache.activemq.ra.activemqmanagedconnectio...@164e48d
txSync=null]
javax.jms.JMSException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
        at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
        at
org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1784)
        at
org.apache.activemq.ActiveMQConnection$2$1.run(ActiveMQConnection.java:1705)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
        at java.lang.Thread.run(Thread.java:595)
Caused by: javax.transaction.xa.XAException: Transaction
'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
has not been started.
        at
org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
        at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
        at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
        at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
        at
org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
        at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
        at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
        at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
        at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
        ... 1 more



-- 
View this message in context: 
http://www.nabble.com/How-to-send-one-message-to-a-topic-in-MDB--tp25325925p25325925.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to