Yes, in the linked issues in my email it shows both projects plan to add
support for it.

The problem with a high session.timeout.ms is if you've got a message that
is locking your consumer in a perpetual processing cycle, then the consumer
will timeout of the group w/o rejoining. So then another consumer in your
group can get locked. Pretty quickly your whole consumer group can be down
due to that single message.

If instead you've got a decoupled heartbeat, then your stuck consumer won't
drop out of the group, so no rebalance will be triggered, and the other
consumers will happily continue consuming their other partitions.

While neither is a great solution, I'd rather have the latter as at least
some of your partitions are still working fine rather than bringing your
entire pipeline to a complete halt.





On Tue, Feb 21, 2017 at 1:50 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Jeff,
>
> You are right that currently kafka-python does not expose two configs, i.e.
> session.timeout.ms and max.poll.timeout.ms as the Java client does, but I
> think the current setting may be sufficient to tackle Martin's issue alone
> as long as session.timeout.ms can be tuned to be large enough; like you
> said the two configs are for separating the cases between "hard failures
> detected by heartbeat" and "soft failures like long GC / blocked on IO /
> simply processing a record takes long time".
>
> BTW I believe fully supporting KIP-62 is on the roadmap of kafka-python
> already, and maybe Magnus (cc'ed) can explain a bit more on the timeline of
> it.
>
>
> Guozhang
>
>
> On Tue, Feb 21, 2017 at 12:06 PM, Jeff Widman <j...@netskope.com> wrote:
>
> > As far as I understood it, the primary thrust of KIP-62 was making it so
> > heartbeats could be issued outside of the poll() loop, meaning that the
> > session.timeout.ms could be reduced below the length of time it takes a
> > consumer to process a particular batch of messages.
> >
> > Unfortunately, while both librdkafka (which confluent-kafka-python relies
> > on under the covers) and kafka-python support issuing heartbeats from a
> > background thread, they both currently still tie that heartbeat to the
> > poll() call. So the developer still has to manually tune max.poll.records
> > and session.timeout.ms, versus once this background heartbeating is
> > decoupling from polling, the defaults should be fine for more use-cases.
> >
> > I actually filed tickets against both projects a few weeks ago for this:
> > https://github.com/edenhill/librdkafka/issues/1039
> > https://github.com/dpkp/kafka-python/issues/948
> >
> >
> >
> > On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Hi Martin,
> > >
> > > Since 0.10.1 KIP-62 has been added to consumer client, so that the user
> > > does not need to manually call pause / resume.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > >
> > > As for python client, as far as I know this background thread approach
> > has
> > > also been adopted in the Confluent's open source kafka-python client as
> > > well:
> > >
> > > https://github.com/confluentinc/confluent-kafka-python
> > >
> > > So that as long as you are willing to set a `session.timeout.ms` high
> > > enough to cover the maximum processing latency of a single record, the
> > > background thread will be responsible for sending heartbeats and hence
> > > users do not need to worry about them.
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > I'm processing messages using kafka-python from a single topic, but
> > > > ocassionally I have a message that might take a while to process, so
> > I'd
> > > > like to send heartbeats while processing the message.
> > > >
> > > > My program works something like this:
> > > >
> > > > consumer = KafkaConsumer(...)
> > > > for message in consumer:
> > > >     if should_store(message.value):
> > > >         store_state(message.value)
> > > >     elif should_process(message.value):
> > > >         # This loop might take a while (longer than heartbeat
> interval)
> > > >         for value in stored_state(message.value):
> > > >             do_something(value)
> > > >
> > > > I think I need to call consumer.client.poll() at regular intervals to
> > > send
> > > > the heartbeats, so the code would look like this:
> > > >
> > > > consumer = KafkaConsumer(...)
> > > > for message in consumer:
> > > >     if should_store(message.value):
> > > >         store_state(message.value)
> > > >     elif should_process(message.value):
> > > >         # This loop might take a while (longer that heartbeat
> interval)
> > > >         for index, value in enumerate(stored_state(message.value)):
> > > >             do_something(value)
> > > >             if index % 10000 == 0:
> > > >                 consumer.client.poll()
> > > >
> > > > Is calling KafkaClient.poll() like this safe to do? The documentation
> > for
> > > > KafkaConsumer.poll() says it is incompatible with the iterator
> > interface
> > > > but nothing like that is in KafkaClient.poll() documentation.
> > > >
> > > > Also, there is KafkaConsumer.pause(). Do I need to pause the
> partitions
> > > I'm
> > > > fetching from before calling consumer.client.poll()? Based on what I
> > have
> > > > seen in the code it looks like calling pause() will discard the
> > buffered
> > > > messages fetched for that partition so far and then fetch them again
> > when
> > > > calling resume(). Is that correct? In this case I'd rather not call
> > > pause()
> > > > if it is not necessary.
> > > >
> > > > Thanks for clarification.
> > > >
> > > > Best Regards,
> > > > Martin
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to