Sure (python):
conn.send(data,
destination='/topic/VirtualTopic.radius',
headers={
'content-length':reclen,
'persistent':'true',
'Xsystem':system,
'Xstatustype':status_type,
'Xrealm':realm,
'expires':int((time.time()+600)*1000)
},
)
I think the problem on the two paths are:
- selectorAware is not durable-aware, I mean, if I consume from a queue
(which is fed by a virtual topic) with a selector, it works only in
online mode, the chain doesn't retain the selector when the consumer is
offline. I think I understand this, and this is what I'm asking in the
subject: can AMQ do this (or something like this, durable subscriptions
to a topic with a selector)?
- without selectorAware, the virtual topic posts all messages from the
topic to the queue, and the consumer only gets ones, which its selector
allows. My problem with this is what do I do with the bigger and bigger
pile of messages, lurking in various queues (for each consumer with
different selectors) and why it doesn't work? I mean it works, but
consuming from this setup takes ages, some messages quickly, then
nothing for looong seconds (nearly one minute), then again a burst and
nothing... I don't know what causes this.
Here are two graphs:
https://picasaweb.google.com/104147045962330059540/AMQSelectorProblem#5942509136370538210
https://picasaweb.google.com/104147045962330059540/AMQSelectorProblem#5942509137044783778
Each of them shows the time passing by in seconds on the X axis and the
number of messages sent on the Y.
The first graph shows what happens when a sender posts into a virtual
topic, which has a (queue) consumer without selectors (and without
selectorAware).
The red sender publishes messages, and the green consumer consumes them
with a slight delay, but with the same rate as the sender.
On the second message, the sender has published 1000 messages, but the
consumer had a selector, which selected only 666 (2/3) of them. You can
see that the sender has published the 666 messages in about a second,
while the consumer got them only in batches with long waits.
I do individual ACKs, but I can't see how a program error could do this.
I receive all selected messages after all, but only with big delays.
But this is just an interesting point, which I don't understand, and my
real problem is the first one:
how to publish in a distributed network of brokers into a topic with
different durable subscribers with selectors.
That's what I would like to know. :)
On 11/04/2013 09:31 PM, Johan Edstrom wrote:
Are you marking the message as persistent?
On Nov 4, 2013, at 1:08 PM, Attila Nagy <b...@fsn.hu> wrote:
Even on activemq.org you can find the terms durable topic and queue. I think
this is a compact way to represent that I want to store/get my messages even
when the consumer is offline.
BTW, I'm afraid you haven't read my mail to the end, or the message didn't get
through.
I'll try to conclude it: I would like to post to a virtual topic in a network
of brokers and consume from this topic (queues) with different clients -with
durable subscriptions, but I hope you won't mind there is no durable
subscription for queues- with different selectors, and I would like to get only
those messages to those queues, which the selectors match.
This works when the consumer is online (consuming from the queue), but when I
remove it, nothing will get into the queue.
Or the other way around: all messages got into the queue even when the consumer
is offline, but the consumer receives messages in very slow batches when I use
a selector.
Without a selector, the full speed can be achieved.
These are two problems, though.
On 11/04/2013 04:56 PM, Aleksandar Ivanisevic wrote:
The only thing that is durable is a consumer, topic per se can not be
durable. Also how is a queue durable? Perhaps you mean a queue with
persistent messages on it?
To have a durable consumer to a topic you need to create a durable
subscription, either through the console or by sending a client-id and
activemq.subscriptionName headers when connecting.
Attila Nagy <b...@fsn.hu> writes:
Hi,
I'm struggling with AMQ 5.9.0 to achieve my goals: durable virtual
topics with selectors on STOMP, with a network of four brokers
(connected with SSL connectors, with ACLs and certificate
authentication/authorization both on client and server side).
In english: I want to publish messages to a -more, but I think it's
irrelevant here- (virtual) topic from machines spread in many data
centers to AMQ servers in two DCs (2x2 machines, fully meshed). Any
publisher or consumer can connect to any of the servers.
The messages in the topic have several headers and I would like to
filter them into durable queues.
So for example Publisher1..10 publishes Type1..10 messages, but
Consumer1 only consuming Type1, Consumer2 consuming Type1-3 and so on
from durable queues.
To protect the queues, I would like to deliver only the messages
matching the consumer's selector, and publish the message with a TTL
set, so if the consumer for the given queue is away for an extender
period of time, the messages should be dropped.
Seems to be fun, but I can't get it to work.
The two behaviours I could get -so far with only one machine and only
one publisher/consumer (one queue):
- everything works nicely, the queue gets only the relevant messages,
but it's not durable. If there is a consumer, it gets the messages,
but if nobody listens, nothing gets to the queue.
- the queue gets all of the messages (not just the ones, the selector
would allow) and is durable. However, the consumer gets the messages
in bursts, like around 130 messages per second and nothing for about a
minute, then another 130 messages and nothing for a minute, while the
queue is full with messages.
The configuration I use is:
http://pastebin.com/d8rkB0Yc
The difference between the two, described above is the selectorAware
true setting, commented out in the pastebin config.
I use a python client, publish to /topic/VirtualTopic.radius and
consume from /queue/Consumer.radiusmq.VirtualTopic.radius with the
following code snippet:
conn.connect(headers={'client-id':'radiusmq'})
conn.subscribe(headers={
'destination':'/queue/Consumer.radiusmq.VirtualTopic.radius',
'ack':'client',
'id':1,
'activemq.prefetchSize':1000,
'selector':"Xsystem = 'wired' AND ("
"Xstatustype = 'STOP' OR "
"Xstatustype = 'INTERIM_UPDATE')",
}
)
I have some graphs about the latter case, if helps, however, I would
like to get the former working, but with a durable queue.
Thanks,