Hi Martin, Since 0.10.1 KIP-62 has been added to consumer client, so that the user does not need to manually call pause / resume.
https://cwiki.apache.org/confluence/display/KAFKA/KIP- 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread As for python client, as far as I know this background thread approach has also been adopted in the Confluent's open source kafka-python client as well: https://github.com/confluentinc/confluent-kafka-python So that as long as you are willing to set a `session.timeout.ms` high enough to cover the maximum processing latency of a single record, the background thread will be responsible for sending heartbeats and hence users do not need to worry about them. Guozhang On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com> wrote: > Hello, > > I'm processing messages using kafka-python from a single topic, but > ocassionally I have a message that might take a while to process, so I'd > like to send heartbeats while processing the message. > > My program works something like this: > > consumer = KafkaConsumer(...) > for message in consumer: > if should_store(message.value): > store_state(message.value) > elif should_process(message.value): > # This loop might take a while (longer than heartbeat interval) > for value in stored_state(message.value): > do_something(value) > > I think I need to call consumer.client.poll() at regular intervals to send > the heartbeats, so the code would look like this: > > consumer = KafkaConsumer(...) > for message in consumer: > if should_store(message.value): > store_state(message.value) > elif should_process(message.value): > # This loop might take a while (longer that heartbeat interval) > for index, value in enumerate(stored_state(message.value)): > do_something(value) > if index % 10000 == 0: > consumer.client.poll() > > Is calling KafkaClient.poll() like this safe to do? The documentation for > KafkaConsumer.poll() says it is incompatible with the iterator interface > but nothing like that is in KafkaClient.poll() documentation. > > Also, there is KafkaConsumer.pause(). Do I need to pause the partitions I'm > fetching from before calling consumer.client.poll()? Based on what I have > seen in the code it looks like calling pause() will discard the buffered > messages fetched for that partition so far and then fetch them again when > calling resume(). Is that correct? In this case I'd rather not call pause() > if it is not necessary. > > Thanks for clarification. > > Best Regards, > Martin > -- -- Guozhang