Greetings!

I am the maintainer of kafka-python. Very cool to see it used in the wild.

The kafka-python library supports the low-level protocols of Kafka 0.7 (Produce/Fetch/MultiProduce/MultiFetch). When you ask Kafka for messages via a Fetch request, you specify an offset + range (much like reading a file). The `iter_messages` helper returns an iterator that automatically handles paging offsets through successive Fetch requests. However, it does not support _saving_ your offsets. One of the parameters to iter_messages is the offset to start at, so when you re-run your script it will start at that point again.

In 0.7, clients must talk to ZooKeeper in order to persist offsets in a Kafka-compatible way (or they could just save them locally depending on the use case). Talking to ZooKeeper from Python is somewhat troublesome, and implementing the Kafka "consumer group rebalancing" is even more troublesome - so I chose to omit it.

In 0.8 (not yet released), consumer offsets are managed centrally by the Kafka brokers and have APIs for clients to commit and fetch offsets. I am in the process of implementing a 0.8 compatible version of kafka-python.

So for the time being, you are on your own with regards to offset management :-/

Cheers!

-David

On 2/16/13 1:35 PM, Philip O'Toole wrote:
You need to read the Kafka design docs. Kafka does not delete messages just 
because a Consumer reads it. It does not track what messages have been consumed 
by any Consumer.

It is up to Consumers to start off where they left off, by always asking for 
the right message (via offsets).

Philip

On Feb 16, 2013, at 4:48 AM, David Montgomery <davidmontgom...@gmail.com> wrote:

Hi,

I have a zookeer and kafka set up.

I am using this python client:  https://github.com/mumrah/kafka-python

I can send and receive messages but they are not deleted.

How can I send a message to kafka and no other consumer can use it?


I feel I am missing something on how kafka works

def produce():
    kafka = KafkaClient("xxx.xxx", 9092)
    kafka.send_messages_simple("my-topic", "some message")
    kafka.close()
    print 'done'

def consume():
    kafka = KafkaClient("xxx.xxx", 9092)
    for msg in kafka.iter_messages("my-topic", 0, 0, 1024*1024,False):
        print(msg.payload)
    kafka.close()
    print 'done'

Every time I ran the above...everytime I ran consume the messages just grew
from previous messages.

Am I missing something on the server.properties file?

Thanks

Reply via email to