Thanks Ken. Let me add a bit more context.

I have one technique for consuming Event Hub messages working. It uses the 
Messenger() class. Going this route I can see no way to access the filtering 
mechanism that you mentioned. The essence is shown in this gist: 
https://gist.github.com/tomconte/e2a4667185a9bf674f59. 

By my read of the qpid docs and examples the mechanism for filtering or reading 
from an offset is enabled through Container/Connection/create_receiver. 
However, I haven't yet figured out how to connect to Azure Event Hubs via this 
mechanism. Here's the Python code as it stands. It does not throw any errors, 
but also does not display any messages. Once I have figured out how to get the 
messages, then I'll move on to filtering. I'm currently working with this 
document that explains how to connect: 
http://qpid.apache.org/releases/qpid-0.32/programming/book/connections.html#connection-url.
 It has some inaccuracies, such as setting connection.transport. Trying to do 
this throws. Not sure how to get an SSL connection. Tried using connection.url 
= "amqps:..." and also tried connection.url = "amqp:ssl:...". 

Can anyone help?

from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton import Connection

class Recv(MessagingHandler):
    def __init__(self):
        super(Recv, self).__init__()
        self.expected = 10
        self.received = 0

    def on_start(self, event):
        print ("on_start")
        connection = Connection()
        connection.url = "amqp:tcp:myNamespace.servicebus.windows.net"
        connection.username = "RootManageSharedAccessKey"
        connection.password = "myPassword"
        connection.reconnect = True
        connection.open()
        
        event.container.create_receiver(connection, 
"/myEventHub/ConsumerGroups/$Default/Partitions/0")
    
    def on_message(self, event):
        if event.message.id and event.message.id < self.received:
            print(event.message.id)
            # ignore duplicate message
            return
        if self.expected == 0 or self.received < self.expected:
            print(event.message.body)
            self.received += 1
            if self.received == self.expected:
                event.receiver.close()
                event.connection.close()

try:
    Container(Recv()).run()
except KeyboardInterrupt: pass


-----Original Message-----
From: Ken Giusti [mailto:[email protected]] 
Sent: Monday, January 23, 2017 10:57 AM
To: [email protected]
Subject: Re: filtering message stream



----- Original Message -----
> From: "Greg Oliver" <[email protected]>
> To: [email protected]
> Sent: Friday, January 20, 2017 6:56:08 PM
> Subject: filtering message stream
> 
> Using Python. Proton 0.16.0.
> 
> Would like to receive messages from an Azure Event Hub using proton. 
> Need to set message filters based on offset or timestamp.
> 
> I've read about adding annotations to a request such as: x-opt-offset, 
> x-opt-enqueued-time, x-opt-sequence-number.
> 
> But no idea how to set these values, especially using Python.
> 
> Is there a book or other resource with examples?
> 

Hi Greg,

In python you can access/set the message annotations via the annotation 
property of the proton.Message class.

You should set this to a map which uses the 'x-opt...' values as keys before 
sending the message.  Example:

import proton

m = proton.Message()
m.annotations = {'x-opt-offset': '100', .... }

caveat: totally untested, just suggesting this from what I've read here:

https://amqpnetlite.codeplex.com/wikipage?title=Using%20Amqp.Net%20Lite%20with%20Azure%20Server%20Bus%20Event%20Hub

-K


> Thanks,
> Greg
> 

--
-K

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected] For additional 
commands, e-mail: [email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to