-----Original Message----- From: Timothy Bish [mailto:tabish...@gmail.com] Sent: 14 September 2010 17:08 To: users@activemq.apache.org Subject: RE: Individual acknowledge in pyactivemq
On Tue, 2010-09-14 at 17:00 +0100, Neil Pritchard wrote: > On Tue, 2010-09-14 at 14:17 +0100, Neil Pritchard wrote: > > Hi All, > > > > I'm using pyactivemq as both a producer and consumer of messages which are > > being brokered by ActiveMQ 5.3.2. In the past I used python stompy to > > produce messages and java ActiveMQ to consume them. I need to set a > > prefetch policy of 1 messgae at a time and more importantly use individual > > acknowledge (or client acknowledge) but can't find any example of how to > > acknowledge the message in pyactivemq, also the activeMQ log (set to debug) > > indicates that the messages are being consumed regardless. Can anyone tell > > me how to set the acknowledgement mode properly and how to acknowledge a > > message. When I try to acknowledge, Python can't find the Acknowledge() > > method. > > > > Many thanks, > > > > Neil > > > > Here's my consumer:.... > > > > #!/usr/bin/env python > > > > import cpickle as pickle > > import Queue > > import pyactivemq > > from pyactivemq import ActiveMQConnectionFactory > > from pyactivemq import AcknowledgeMode > > > > class MessageListener(pyactivemq.MessageListener): > > def __init__(self, name, queue): > > pyactivemq.MessageListener.__init__(self) > > self.name = name > > self.queue = queue > > > > def onMessage(self, message): > > self.queue.put('%s got: %s' % (self.name, message.text)) > > > > f = > > ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?wireFormat=openwire') > > conn = f.createConnection() > > consumer_session = > > conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE) > > myqueue = consumer_session.createQueue('NOTIFICATIONS.QUEUE') > > queue = Queue.Queue(0) > > > > session = conn.createSession(AcknowledgeMode.INDIVIDUAL_ACKNOWLEDGE) > > consumer = session.createConsumer(myqueue) > > listener = MessageListener('consumer', queue) > > consumer.messageListener = listener > > > > conn.start() > > while queue: > > message = queue.get(block=True) > > message = message[14:] > > notificationDict = pickle.loads(message) > > print notificationDict > > acknowledge() > > > > conn.close() > > > > It looks as if the acknowledge method is exposed on each Message object, > so you should be able to ack them as they are received. Whether or not > it works is another question. > > Regards > > The correct URI setting would be something like, cms.PrefetchPolicy.topicPrefetch=1 cms.PrefetchPolicy.queuePrefetch=1 etc... the cms Prefetch policy doesn't current support an "all" configuration option, you could open a new Jira issue to add on to v3.2.4. Regards -- Tim Bish Open Source Integration: http://fusesource.com Follow me on Twitter: http://twitter.com/tabish121 My Blog: http://timbish.blogspot.com/ Hi Tim, I assume the config goes here.... f = ActiveMQConnectionFactory('failover:(tcp://localhost:61616)?cms.PrefetchPolicy.queuePrefetch=1') Cheers, Neil