As far as I understood it, the primary thrust of KIP-62 was making it so
heartbeats could be issued outside of the poll() loop, meaning that the
session.timeout.ms could be reduced below the length of time it takes a
consumer to process a particular batch of messages.

Unfortunately, while both librdkafka (which confluent-kafka-python relies
on under the covers) and kafka-python support issuing heartbeats from a
background thread, they both currently still tie that heartbeat to the
poll() call. So the developer still has to manually tune max.poll.records
and session.timeout.ms, versus once this background heartbeating is
decoupling from polling, the defaults should be fine for more use-cases.

I actually filed tickets against both projects a few weeks ago for this:
https://github.com/edenhill/librdkafka/issues/1039
https://github.com/dpkp/kafka-python/issues/948



On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Martin,
>
> Since 0.10.1 KIP-62 has been added to consumer client, so that the user
> does not need to manually call pause / resume.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
>
> As for python client, as far as I know this background thread approach has
> also been adopted in the Confluent's open source kafka-python client as
> well:
>
> https://github.com/confluentinc/confluent-kafka-python
>
> So that as long as you are willing to set a `session.timeout.ms` high
> enough to cover the maximum processing latency of a single record, the
> background thread will be responsible for sending heartbeats and hence
> users do not need to worry about them.
>
> Guozhang
>
>
> On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com> wrote:
>
> > Hello,
> >
> > I'm processing messages using kafka-python from a single topic, but
> > ocassionally I have a message that might take a while to process, so I'd
> > like to send heartbeats while processing the message.
> >
> > My program works something like this:
> >
> > consumer = KafkaConsumer(...)
> > for message in consumer:
> >     if should_store(message.value):
> >         store_state(message.value)
> >     elif should_process(message.value):
> >         # This loop might take a while (longer than heartbeat interval)
> >         for value in stored_state(message.value):
> >             do_something(value)
> >
> > I think I need to call consumer.client.poll() at regular intervals to
> send
> > the heartbeats, so the code would look like this:
> >
> > consumer = KafkaConsumer(...)
> > for message in consumer:
> >     if should_store(message.value):
> >         store_state(message.value)
> >     elif should_process(message.value):
> >         # This loop might take a while (longer that heartbeat interval)
> >         for index, value in enumerate(stored_state(message.value)):
> >             do_something(value)
> >             if index % 10000 == 0:
> >                 consumer.client.poll()
> >
> > Is calling KafkaClient.poll() like this safe to do? The documentation for
> > KafkaConsumer.poll() says it is incompatible with the iterator interface
> > but nothing like that is in KafkaClient.poll() documentation.
> >
> > Also, there is KafkaConsumer.pause(). Do I need to pause the partitions
> I'm
> > fetching from before calling consumer.client.poll()? Based on what I have
> > seen in the code it looks like calling pause() will discard the buffered
> > messages fetched for that partition so far and then fetch them again when
> > calling resume(). Is that correct? In this case I'd rather not call
> pause()
> > if it is not necessary.
> >
> > Thanks for clarification.
> >
> > Best Regards,
> > Martin
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to