I did not see this problem with the admittedly inefficient test class
attached (which is only the consumer part).

Thanks,
       Aaron

On Fri, May 16, 2008 at 6:17 PM, jydev <[EMAIL PROTECTED]> wrote:
>
> Hello,
>
> Getting the following error with VirtualTopic on 5.1, when there are more
> than one topic subscribers.
>
> ERROR Service                        - Async error occurred:
> java.lang.ClassCastException: org.apache.activemq.broker.region.Topic cannot
> be cast to org.apache.activemq.broker.region.Queue
> java.lang.ClassCastException: org.apache.activemq.broker.region.Topic cannot
> be cast to org.apache.activemq.broker.region.Queue
>        at
> org.apache.activemq.broker.region.QueueSubscription.acknowledge(QueueSubscription.java:50)
>        at
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:224)
>        at
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:364)
>        at
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:470)
>        at
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
>        at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73)
>        at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73)
>        at
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:84)
>        at
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:443)
>        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:196)
>        at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
>        at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180)
>        at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>        at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>        at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>        at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183)
>        at java.lang.Thread.run(Thread.java:619)
>
> This seems to be already captured as an issue:
> https://issues.apache.org/activemq/browse/AMQ-1687
>
> Any idea when there will be a patch for this?  Is there a work around?
>
> It seems like the messages are getting to the listeners ok even though
> broker is spitting out the errors above.  But I want to make sure that there
> would be no weird side-effects due to the error.
>
> Thanks you in advance
> jydev
> --
> View this message in context: 
> http://www.nabble.com/ClassCastException-with-VirtualTopic-on-5.1-tp17285256s2354p17285256.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
>
package training;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.MessageConsumer;
import javax.jms.Destination;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Used to start clients to read a number of messages from a topic or queue.
 */
public class GenericConsumer {
    private final static String DEFAULT_ACTIVEMQ_URL = "tcp://localhost:61616";
    private ConnectionFactory factory;

    public void initialize() {
        factory = new ActiveMQConnectionFactory(DEFAULT_ACTIVEMQ_URL);
    }

    public void receiveMessage(String destName, boolean isQueue, String 
clientID) throws JMSException {
        Connection connection = null;
        Session session = null;
        MessageConsumer consumer = null;
        try {
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination dest = isQueue ? session.createQueue(destName) : 
session.createTopic(destName);
            consumer = session.createConsumer(dest);
            connection.start();
            TextMessage message = (TextMessage) consumer.receive(5000);
            if(message != null) {
                System.out.println(clientID+" Received message on destination 
"+destName+" with ID "+message.getJMSMessageID());
            } else {
                System.out.println(clientID+" Consumer timed out on destination 
"+destName+"; no message received.");
            }
            connection.stop();
        } finally {
            if(consumer != null) try {consumer.close();}catch(JMSException e) {}
            if(session != null) try {session.close();}catch(JMSException e) {}
            if(connection != null) try {connection.close();}catch(JMSException 
e) {}
        }
    }

    /**
     * Starts a new thread to process messages from the given destination.
     */
    public static void launchConsumer(final String destName, final boolean 
isQueue, final String clientID, final int messageCount) {
        Thread t = new Thread() {
            public void run() {
                GenericConsumer consumer = new GenericConsumer();
                consumer.initialize();
                System.out.println(clientID+" consumer started.");
                try {
                    for(int i=0; i<messageCount; i++) {
                        consumer.receiveMessage(destName, isQueue, clientID);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(clientID+" consumer FINISHED.");
            }
        };
        t.start();
    }

    public static void main(String[] args) {
        // Start 2 consumers to read off the queue normally
        launchConsumer("Consumer.Foo.VirtualTopic.Test", true, "Client 1", 10);
        launchConsumer("Consumer.Foo.VirtualTopic.Test", true, "Client 2", 20);
        // Start 2 consumers to read off the queue normally
        launchConsumer("Consumer.Bar.VirtualTopic.Test", true, "ClientB1", 10);
        launchConsumer("Consumer.Bar.VirtualTopic.Test", true, "ClientB2", 20);
    }

}

Reply via email to