Hi, I am trying to write a standalone JMS host that can transactionally receive messages from ActiveMQ using JTA. I have implemented ServerSessionPool and ServerSession and created a ConnectionConsumer using the JS Connection by passing the instance os the ServerSessionPool implementation. However, I am unable to receive any messages despite starting the connection (the start method on the ServerSession implementation doesnt seem to get called).
Any pointers would be highly appreciated, 1. TransactionHandler public interface TransactionHandler { void begin(Session session) throws JmsTxException; void commit(Session session) throws JmsTxException; void rollback(Session session) throws JmsTxException; } 2. JtaTransactionHandler public class JtaTransactionHandler implements TransactionHandler { private TransactionManager transactionManager; public void setTransactionManager(TransactionManager transactionManager) { this.transactionManager = transactionManager; } public void begin(Session session) throws JmsTxException { try { transactionManager.begin(); if(!(session instanceof XASession)) { throw new JmsTxException("XA session required for global transactions"); } XASession xaSession = (XASession) session; XAResource xaResource = xaSession.getXAResource(); transactionManager.getTransaction().enlistResource(xaResource); } catch (Exception e) { throw new JmsTxException(e); } } public void commit(Session session) throws JmsTxException { try { transactionManager.commit(); } catch (Exception e) { throw new JmsTxException(e); } } public void rollback(Session session) throws JmsTxException { try { transactionManager.rollback(); } catch (Exception e) { throw new JmsTxException(e); } } } 3. ServerSession implementation public class StandaloneServerSession implements ServerSession { private StandaloneServerSessionPool serverSessionPool; private Session session; private TransactionHandler transactionHandler; public StandaloneServerSession(Session session, StandaloneServerSessionPool serverSessionPool, TransactionHandler transactionHandler) { this.session = session; this.serverSessionPool = serverSessionPool; this.transactionHandler = transactionHandler; } public Session getSession() throws JMSException { return session; } public void start() throws JMSException { try { transactionHandler.begin(session); session.run(); transactionHandler.commit(session); } catch(RuntimeException ex) { transactionHandler.rollback(session); throw ex; } finally { serverSessionPool.returnSession(this); } } } 4. ServerSessionPool implementation public class StandaloneServerSessionPool implements ServerSessionPool { // Available server sessions private Stack<ServerSession> serverSessions = new Stack<ServerSession>(); /** * Initializes the server sessions. * @param serverSessions Server sessions. */ public StandaloneServerSessionPool(List<Session> sessions, TransactionHandler transactionHandler) { for(Session session : sessions) { ServerSession serverSession = new StandaloneServerSession(session, this, transactionHandler); this.serverSessions.push(serverSession); } } public void stop() throws JMSException { ServerSession serverSession = null; while((serverSession = getServerSession()) != null) { serverSession.getSession().close(); } } public ServerSession getServerSession() throws JMSException { synchronized (serverSessions) { while(serverSessions.isEmpty()) { try { serverSessions.wait(); } catch (InterruptedException e) { throw new JMSException("Unable to get a server session"); } } return serverSessions.pop(); } } protected void returnSession(ServerSession serverSession) { synchronized (serverSessions) { serverSessions.push(serverSession); serverSessions.notify(); } } } 6. Code to create te connection consumer public void registerListener(Destination destination, ConnectionFactory connectionFactory, List<MessageListener> listeners, TransactionType transactionType) { try { connection = connectionFactory.createConnection(); List<Session> sessions = new LinkedList<Session>(); for (MessageListener listener : listeners) { boolean transacted = transactionType != TransactionType.GLOBAL; Session session = connection.createSession(transacted, Session.SESSION_TRANSACTED); session.setMessageListener(listener); sessions.add(session); } TransactionHandler transactionHandler = transactionHandlers.get(transactionType); StandaloneServerSessionPool serverSessionPool = new StandaloneServerSessionPool(sessions, transactionHandler); ConnectionConsumer connectionConsumer = connection.createConnectionConsumer(destination, null, serverSessionPool, 1); connection.start(); } catch (JMSException ex) { throw new RuntimeException("Unable to activate service", ex); } } Kind Regards Meeraj -- View this message in context: http://www.nabble.com/Connection-Consumer-and-Server-Session-Pools-tf4446061s2354.html#a12686031 Sent from the ActiveMQ - User mailing list archive at Nabble.com.