I did not see this problem with the admittedly inefficient test class attached (which is only the consumer part).
Thanks, Aaron On Fri, May 16, 2008 at 6:17 PM, jydev <[EMAIL PROTECTED]> wrote: > > Hello, > > Getting the following error with VirtualTopic on 5.1, when there are more > than one topic subscribers. > > ERROR Service - Async error occurred: > java.lang.ClassCastException: org.apache.activemq.broker.region.Topic cannot > be cast to org.apache.activemq.broker.region.Queue > java.lang.ClassCastException: org.apache.activemq.broker.region.Topic cannot > be cast to org.apache.activemq.broker.region.Queue > at > org.apache.activemq.broker.region.QueueSubscription.acknowledge(QueueSubscription.java:50) > at > org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:224) > at > org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:364) > at > org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:470) > at > org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73) > at > org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:84) > at > org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:443) > at org.apache.activemq.command.MessageAck.visit(MessageAck.java:196) > at > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) > at > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183) > at java.lang.Thread.run(Thread.java:619) > > This seems to be already captured as an issue: > https://issues.apache.org/activemq/browse/AMQ-1687 > > Any idea when there will be a patch for this? Is there a work around? > > It seems like the messages are getting to the listeners ok even though > broker is spitting out the errors above. But I want to make sure that there > would be no weird side-effects due to the error. > > Thanks you in advance > jydev > -- > View this message in context: > http://www.nabble.com/ClassCastException-with-VirtualTopic-on-5.1-tp17285256s2354p17285256.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > > >
package training; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Connection; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.MessageConsumer; import javax.jms.Destination; import org.apache.activemq.ActiveMQConnectionFactory; /** * Used to start clients to read a number of messages from a topic or queue. */ public class GenericConsumer { private final static String DEFAULT_ACTIVEMQ_URL = "tcp://localhost:61616"; private ConnectionFactory factory; public void initialize() { factory = new ActiveMQConnectionFactory(DEFAULT_ACTIVEMQ_URL); } public void receiveMessage(String destName, boolean isQueue, String clientID) throws JMSException { Connection connection = null; Session session = null; MessageConsumer consumer = null; try { connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination dest = isQueue ? session.createQueue(destName) : session.createTopic(destName); consumer = session.createConsumer(dest); connection.start(); TextMessage message = (TextMessage) consumer.receive(5000); if(message != null) { System.out.println(clientID+" Received message on destination "+destName+" with ID "+message.getJMSMessageID()); } else { System.out.println(clientID+" Consumer timed out on destination "+destName+"; no message received."); } connection.stop(); } finally { if(consumer != null) try {consumer.close();}catch(JMSException e) {} if(session != null) try {session.close();}catch(JMSException e) {} if(connection != null) try {connection.close();}catch(JMSException e) {} } } /** * Starts a new thread to process messages from the given destination. */ public static void launchConsumer(final String destName, final boolean isQueue, final String clientID, final int messageCount) { Thread t = new Thread() { public void run() { GenericConsumer consumer = new GenericConsumer(); consumer.initialize(); System.out.println(clientID+" consumer started."); try { for(int i=0; i<messageCount; i++) { consumer.receiveMessage(destName, isQueue, clientID); } } catch (Exception e) { e.printStackTrace(); } System.out.println(clientID+" consumer FINISHED."); } }; t.start(); } public static void main(String[] args) { // Start 2 consumers to read off the queue normally launchConsumer("Consumer.Foo.VirtualTopic.Test", true, "Client 1", 10); launchConsumer("Consumer.Foo.VirtualTopic.Test", true, "Client 2", 20); // Start 2 consumers to read off the queue normally launchConsumer("Consumer.Bar.VirtualTopic.Test", true, "ClientB1", 10); launchConsumer("Consumer.Bar.VirtualTopic.Test", true, "ClientB2", 20); } }