Hello Jeff,

You are right that currently kafka-python does not expose two configs, i.e.
session.timeout.ms and max.poll.timeout.ms as the Java client does, but I
think the current setting may be sufficient to tackle Martin's issue alone
as long as session.timeout.ms can be tuned to be large enough; like you
said the two configs are for separating the cases between "hard failures
detected by heartbeat" and "soft failures like long GC / blocked on IO /
simply processing a record takes long time".

BTW I believe fully supporting KIP-62 is on the roadmap of kafka-python
already, and maybe Magnus (cc'ed) can explain a bit more on the timeline of
it.


Guozhang


On Tue, Feb 21, 2017 at 12:06 PM, Jeff Widman <j...@netskope.com> wrote:

> As far as I understood it, the primary thrust of KIP-62 was making it so
> heartbeats could be issued outside of the poll() loop, meaning that the
> session.timeout.ms could be reduced below the length of time it takes a
> consumer to process a particular batch of messages.
>
> Unfortunately, while both librdkafka (which confluent-kafka-python relies
> on under the covers) and kafka-python support issuing heartbeats from a
> background thread, they both currently still tie that heartbeat to the
> poll() call. So the developer still has to manually tune max.poll.records
> and session.timeout.ms, versus once this background heartbeating is
> decoupling from polling, the defaults should be fine for more use-cases.
>
> I actually filed tickets against both projects a few weeks ago for this:
> https://github.com/edenhill/librdkafka/issues/1039
> https://github.com/dpkp/kafka-python/issues/948
>
>
>
> On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Hi Martin,
> >
> > Since 0.10.1 KIP-62 has been added to consumer client, so that the user
> > does not need to manually call pause / resume.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> >
> > As for python client, as far as I know this background thread approach
> has
> > also been adopted in the Confluent's open source kafka-python client as
> > well:
> >
> > https://github.com/confluentinc/confluent-kafka-python
> >
> > So that as long as you are willing to set a `session.timeout.ms` high
> > enough to cover the maximum processing latency of a single record, the
> > background thread will be responsible for sending heartbeats and hence
> > users do not need to worry about them.
> >
> > Guozhang
> >
> >
> > On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com>
> wrote:
> >
> > > Hello,
> > >
> > > I'm processing messages using kafka-python from a single topic, but
> > > ocassionally I have a message that might take a while to process, so
> I'd
> > > like to send heartbeats while processing the message.
> > >
> > > My program works something like this:
> > >
> > > consumer = KafkaConsumer(...)
> > > for message in consumer:
> > >     if should_store(message.value):
> > >         store_state(message.value)
> > >     elif should_process(message.value):
> > >         # This loop might take a while (longer than heartbeat interval)
> > >         for value in stored_state(message.value):
> > >             do_something(value)
> > >
> > > I think I need to call consumer.client.poll() at regular intervals to
> > send
> > > the heartbeats, so the code would look like this:
> > >
> > > consumer = KafkaConsumer(...)
> > > for message in consumer:
> > >     if should_store(message.value):
> > >         store_state(message.value)
> > >     elif should_process(message.value):
> > >         # This loop might take a while (longer that heartbeat interval)
> > >         for index, value in enumerate(stored_state(message.value)):
> > >             do_something(value)
> > >             if index % 10000 == 0:
> > >                 consumer.client.poll()
> > >
> > > Is calling KafkaClient.poll() like this safe to do? The documentation
> for
> > > KafkaConsumer.poll() says it is incompatible with the iterator
> interface
> > > but nothing like that is in KafkaClient.poll() documentation.
> > >
> > > Also, there is KafkaConsumer.pause(). Do I need to pause the partitions
> > I'm
> > > fetching from before calling consumer.client.poll()? Based on what I
> have
> > > seen in the code it looks like calling pause() will discard the
> buffered
> > > messages fetched for that partition so far and then fetch them again
> when
> > > calling resume(). Is that correct? In this case I'd rather not call
> > pause()
> > > if it is not necessary.
> > >
> > > Thanks for clarification.
> > >
> > > Best Regards,
> > > Martin
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to