Hi Martin,

Since 0.10.1 KIP-62 has been added to consumer client, so that the user
does not need to manually call pause / resume.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

As for python client, as far as I know this background thread approach has
also been adopted in the Confluent's open source kafka-python client as
well:

https://github.com/confluentinc/confluent-kafka-python

So that as long as you are willing to set a `session.timeout.ms` high
enough to cover the maximum processing latency of a single record, the
background thread will be responsible for sending heartbeats and hence
users do not need to worry about them.

Guozhang


On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com> wrote:

> Hello,
>
> I'm processing messages using kafka-python from a single topic, but
> ocassionally I have a message that might take a while to process, so I'd
> like to send heartbeats while processing the message.
>
> My program works something like this:
>
> consumer = KafkaConsumer(...)
> for message in consumer:
>     if should_store(message.value):
>         store_state(message.value)
>     elif should_process(message.value):
>         # This loop might take a while (longer than heartbeat interval)
>         for value in stored_state(message.value):
>             do_something(value)
>
> I think I need to call consumer.client.poll() at regular intervals to send
> the heartbeats, so the code would look like this:
>
> consumer = KafkaConsumer(...)
> for message in consumer:
>     if should_store(message.value):
>         store_state(message.value)
>     elif should_process(message.value):
>         # This loop might take a while (longer that heartbeat interval)
>         for index, value in enumerate(stored_state(message.value)):
>             do_something(value)
>             if index % 10000 == 0:
>                 consumer.client.poll()
>
> Is calling KafkaClient.poll() like this safe to do? The documentation for
> KafkaConsumer.poll() says it is incompatible with the iterator interface
> but nothing like that is in KafkaClient.poll() documentation.
>
> Also, there is KafkaConsumer.pause(). Do I need to pause the partitions I'm
> fetching from before calling consumer.client.poll()? Based on what I have
> seen in the code it looks like calling pause() will discard the buffered
> messages fetched for that partition so far and then fetch them again when
> calling resume(). Is that correct? In this case I'd rather not call pause()
> if it is not necessary.
>
> Thanks for clarification.
>
> Best Regards,
> Martin
>



-- 
-- Guozhang

Reply via email to