Thank you for the feedback. 

I have created a simple test case for this based on one of the ActiveMQ basic 
examples (attached to this mail - App.java).

I create two producers, one that will send messages to virtual topic 
VirtualTopic.> and another 
one that will send messages to virtual topic VirtualTopic.MIGRATION.

And I create two consumers, one that will consume from queue 
Consumer.ConsumerA.VirtualTopic.> and another 
one that will consume from queue Consumer.ConsumerB.VirtualTopic.MIGRATION.

Sending one message to each virtual topic I would expect something like this:

    Consumer 'ConsumerA' received: This message was from topic 'VirtualTopic.>'.
    Consumer 'ConsumerB' received: This message was from topic 
'VirtualTopic.MIGRATION'.

But I get something like this (the messages are replicated to all queues):

    Consumer 'ConsumerA' received: This message was from topic 'VirtualTopic.>'.
    Consumer 'ConsumerB' received: This message was from topic 'VirtualTopic.>'.
    Consumer 'ConsumerA' received: This message was from topic 
'VirtualTopic.MIGRATION'.
    Consumer 'ConsumerB' received: This message was from topic 
'VirtualTopic.MIGRATION'.

I tested this using the default configuration with ActiveMQ versions 5.9.0 and 
5.13.2.

You told that you were able to get this behavior with a sample. I would be very 
grateful if you could share that sample :)

Regards, 

Le mardi 15 mars 2016 à 13:36 -0600, Quinn Stevenson a écrit :
> Is there a specific reason the default configuration won’t work for you?  
> (Either don’t set anything, or explicitly set the default of
> <virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/>
> 
> I tried a simple sample using the defaults, and it seems to accomplish what 
> you’re after.
> 
> > On Mar 14, 2016, at 11:05 AM, Nuno Oliveira 
> > <nuno.olive...@geo-solutions.it> wrote:
> > 
> > <virtualTopic name="VirtualTopic.MIGRATION" 
> > prefix="Consumer.*.VirtualTopic."/>
> >                <virtualTopic name="VirtualTopic.>" 
> > prefix="Consumer.*.VirtualTopic."/>
> 
-- 
==
GeoServer Professional Services from the experts! 
Visit http://goo.gl/it488V for more information.
==
Nuno Miguel Carvalho Oliveira
@nmcoliveira
Software Engineer

GeoSolutions S.A.S.
Via di Montramito 3/A
55054  Massarosa (LU)
Italy

phone: +39 0584 962313
fax:   +39 0584 1660272
mob:   +39  333 8128928

http://www.geo-solutions.it
http://twitter.com/geosolutions_it

-------------------------------------------------------

AVVERTENZE AI SENSI DEL D.Lgs. 196/2003
Le informazioni contenute in questo messaggio di posta elettronica e/o nel/i 
file/s allegato/i sono
da considerarsi strettamente riservate. Il loro utilizzo è consentito 
esclusivamente al destinatario del messaggio, per le finalità indicate
nel messaggio stesso. Qualora riceviate questo messaggio senza esserne il 
destinatario, Vi preghiamo cortesemente di darcene notizia via e
-mail e di procedere alla distruzione del messaggio stesso, cancellandolo dal 
Vostro sistema. Conservare il messaggio stesso, divulgarlo
anche in parte, distribuirlo ad altri soggetti, copiarlo, od utilizzarlo per 
finalità diverse, costituisce comportamento contrario ai
principi dettati dal D.Lgs. 196/2003.
 
The information in this message and/or attachments, is intended solely for the 
attention and use of
the named addressee(s) and may be confidential or proprietary in nature or 
covered by the provisions of privacy act (Legislative Decree
June, 30 2003, no.196 - Italy's New Data Protection Code).Any use not in accord 
with its purpose, any disclosure, reproduction, copying,
distribution, or either dissemination, either whole or partial, is strictly 
forbidden except previous formal approval of the named
addressee(s). If you are not the intended recipient, please contact immediately 
the sender by telephone, fax or e-mail and delete the
information in this message that has been received in error. The sender does 
not give any warranty or accept liability as the content,
accuracy or completeness of sent messages and accepts no responsibility  for 
changes made after they were sent or for other risks which
arise as a result of e-mail transmission, viruses, etc.
package org.nmco;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class App {

    public static void main(String[] args) throws Exception {
        thread(new Producer("VirtualTopic.>"), false);
        thread(new Producer("VirtualTopic.MIGRATION"), false);
        thread(new Consumer("ConsumerA", "VirtualTopic.>"), false);
        thread(new Consumer("ConsumerB", "VirtualTopic.MIGRATION"), false);
        Thread.sleep(5000);
    }

    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }

    public static class Producer implements Runnable {

        private final String topicName;

        public Producer(String topicName) {
            this.topicName = topicName;
        }

        public void run() {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createTopic(topicName);
                MessageProducer producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                TextMessage message = session.createTextMessage(
                        String.format("This message was from topic '%s'.", topicName));
                producer.send(message);
                session.close();
                connection.close();
            } catch (Exception exception) {
                System.out.println("Caught: " + exception);
                exception.printStackTrace();
            }
        }
    }

    public static class Consumer implements Runnable, ExceptionListener {

        private final String consumerName;
        private final String topicName;

        public Consumer(String consumerName, String topicName) {
            this.consumerName = consumerName;
            this.topicName = topicName;
        }

        public void run() {
            try {
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
                Connection connection = connectionFactory.createConnection();
                connection.start();
                connection.setExceptionListener(this);
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(String.format("Consumer.%s.%s", consumerName, topicName));
                MessageConsumer consumer = session.createConsumer(destination);
                Message message = consumer.receive(1000);
                while(message != null){
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        String text = textMessage.getText();
                        System.out.println(String.format("Consumer '%s' received: %s", consumerName, text));
                    } else {
                        System.out.println(String.format("Consumer '%s' received: %s", consumerName, message));
                    }
                    message = consumer.receive(1000);
                }
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception exception) {
                System.out.println("Caught: " + exception);
                exception.printStackTrace();
            }
        }

        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occurred.  Shutting down client.");
        }
    }
}

Reply via email to