Hi,

I'm struggling with AMQ 5.9.0 to achieve my goals: durable virtual topics with selectors on STOMP, with a network of four brokers (connected with SSL connectors, with ACLs and certificate authentication/authorization both on client and server side).

In english: I want to publish messages to a -more, but I think it's irrelevant here- (virtual) topic from machines spread in many data centers to AMQ servers in two DCs (2x2 machines, fully meshed). Any publisher or consumer can connect to any of the servers. The messages in the topic have several headers and I would like to filter them into durable queues. So for example Publisher1..10 publishes Type1..10 messages, but Consumer1 only consuming Type1, Consumer2 consuming Type1-3 and so on from durable queues. To protect the queues, I would like to deliver only the messages matching the consumer's selector, and publish the message with a TTL set, so if the consumer for the given queue is away for an extender period of time, the messages should be dropped.

Seems to be fun, but I can't get it to work.

The two behaviours I could get -so far with only one machine and only one publisher/consumer (one queue): - everything works nicely, the queue gets only the relevant messages, but it's not durable. If there is a consumer, it gets the messages, but if nobody listens, nothing gets to the queue. - the queue gets all of the messages (not just the ones, the selector would allow) and is durable. However, the consumer gets the messages in bursts, like around 130 messages per second and nothing for about a minute, then another 130 messages and nothing for a minute, while the queue is full with messages.

The configuration I use is:
http://pastebin.com/d8rkB0Yc

The difference between the two, described above is the selectorAware true setting, commented out in the pastebin config.

I use a python client, publish to /topic/VirtualTopic.radius and consume from /queue/Consumer.radiusmq.VirtualTopic.radius with the following code snippet:
    conn.connect(headers={'client-id':'radiusmq'})
    conn.subscribe(headers={
'destination':'/queue/Consumer.radiusmq.VirtualTopic.radius',
                            'ack':'client',
                            'id':1,
                            'activemq.prefetchSize':1000,
                            'selector':"Xsystem = 'wired' AND ("
                            "Xstatustype = 'STOP' OR "
                            "Xstatustype = 'INTERIM_UPDATE')",
                            }
                   )

I have some graphs about the latter case, if helps, however, I would like to get the former working, but with a durable queue.

Thanks,

Reply via email to