Martin,

If you called, for example, commitAsync() before you call client.poll()
then the commit request will be sent during that period.


Guozhang


On Wed, Feb 22, 2017 at 2:04 AM, Martin Sucha <msu...@exponea.com> wrote:

> Hi,
>
> thank you both for responses.
>
> I forgot to mention which python client I'm using, so let me fix that
> first:
> Currently I'm using https://github.com/dpkp/kafka-python
>
> As Jeff mentioned, if there is a message that takes too long to process,
> the group will rebalance and it will affect the whole consumer group.
>
> In my particular case I'm able to call some function periodically during
> the long processing of that message, so I thought that I could just call
> into
> the Kafka client to do its I/O in my thread. If that was possible I would
> not
> need to set session timeout higher (so that crashes get detected quickly,
> while also allowing longer time to process that single type of message).
>
> As far as I understand, calling KafkaClient.poll() manually should send any
> pending heartbeats, but I wanted to check whether it might put the consumer
> into an unexpected state or not. Are there any things that might break
> if I call KafkaClient.poll() (not to be confused with KafkaConsumer.poll()
> which
> I don't intend to call there) in the middle of processing a message?
> For example, could a commit be sent for that particular message in this
> case?
>
> If the above approach would not work, I'll probably need to increase the
> session timeout.
>
> Martin
>
>
> On Tue, Feb 21, 2017 at 11:13 PM, Jeff Widman <j...@netskope.com> wrote:
>
> > Yes, in the linked issues in my email it shows both projects plan to add
> > support for it.
> >
> > The problem with a high session.timeout.ms is if you've got a message
> that
> > is locking your consumer in a perpetual processing cycle, then the
> consumer
> > will timeout of the group w/o rejoining. So then another consumer in your
> > group can get locked. Pretty quickly your whole consumer group can be
> down
> > due to that single message.
> >
> > If instead you've got a decoupled heartbeat, then your stuck consumer
> won't
> > drop out of the group, so no rebalance will be triggered, and the other
> > consumers will happily continue consuming their other partitions.
> >
> > While neither is a great solution, I'd rather have the latter as at least
> > some of your partitions are still working fine rather than bringing your
> > entire pipeline to a complete halt.
> >
> >
> >
> >
> >
> > On Tue, Feb 21, 2017 at 1:50 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Jeff,
> > >
> > > You are right that currently kafka-python does not expose two configs,
> > i.e.
> > > session.timeout.ms and max.poll.timeout.ms as the Java client does,
> but
> > I
> > > think the current setting may be sufficient to tackle Martin's issue
> > alone
> > > as long as session.timeout.ms can be tuned to be large enough; like
> you
> > > said the two configs are for separating the cases between "hard
> failures
> > > detected by heartbeat" and "soft failures like long GC / blocked on IO
> /
> > > simply processing a record takes long time".
> > >
> > > BTW I believe fully supporting KIP-62 is on the roadmap of kafka-python
> > > already, and maybe Magnus (cc'ed) can explain a bit more on the
> timeline
> > of
> > > it.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Feb 21, 2017 at 12:06 PM, Jeff Widman <j...@netskope.com>
> wrote:
> > >
> > > > As far as I understood it, the primary thrust of KIP-62 was making it
> > so
> > > > heartbeats could be issued outside of the poll() loop, meaning that
> the
> > > > session.timeout.ms could be reduced below the length of time it
> takes
> > a
> > > > consumer to process a particular batch of messages.
> > > >
> > > > Unfortunately, while both librdkafka (which confluent-kafka-python
> > relies
> > > > on under the covers) and kafka-python support issuing heartbeats
> from a
> > > > background thread, they both currently still tie that heartbeat to
> the
> > > > poll() call. So the developer still has to manually tune
> > max.poll.records
> > > > and session.timeout.ms, versus once this background heartbeating is
> > > > decoupling from polling, the defaults should be fine for more
> > use-cases.
> > > >
> > > > I actually filed tickets against both projects a few weeks ago for
> > this:
> > > > https://github.com/edenhill/librdkafka/issues/1039
> > > > https://github.com/dpkp/kafka-python/issues/948
> > > >
> > > >
> > > >
> > > > On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Martin,
> > > > >
> > > > > Since 0.10.1 KIP-62 has been added to consumer client, so that the
> > user
> > > > > does not need to manually call pause / resume.
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > > > >
> > > > > As for python client, as far as I know this background thread
> > approach
> > > > has
> > > > > also been adopted in the Confluent's open source kafka-python
> client
> > as
> > > > > well:
> > > > >
> > > > > https://github.com/confluentinc/confluent-kafka-python
> > > > >
> > > > > So that as long as you are willing to set a `session.timeout.ms`
> > high
> > > > > enough to cover the maximum processing latency of a single record,
> > the
> > > > > background thread will be responsible for sending heartbeats and
> > hence
> > > > > users do not need to worry about them.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com>
> > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > I'm processing messages using kafka-python from a single topic,
> but
> > > > > > ocassionally I have a message that might take a while to process,
> > so
> > > > I'd
> > > > > > like to send heartbeats while processing the message.
> > > > > >
> > > > > > My program works something like this:
> > > > > >
> > > > > > consumer = KafkaConsumer(...)
> > > > > > for message in consumer:
> > > > > >     if should_store(message.value):
> > > > > >         store_state(message.value)
> > > > > >     elif should_process(message.value):
> > > > > >         # This loop might take a while (longer than heartbeat
> > > interval)
> > > > > >         for value in stored_state(message.value):
> > > > > >             do_something(value)
> > > > > >
> > > > > > I think I need to call consumer.client.poll() at regular
> intervals
> > to
> > > > > send
> > > > > > the heartbeats, so the code would look like this:
> > > > > >
> > > > > > consumer = KafkaConsumer(...)
> > > > > > for message in consumer:
> > > > > >     if should_store(message.value):
> > > > > >         store_state(message.value)
> > > > > >     elif should_process(message.value):
> > > > > >         # This loop might take a while (longer that heartbeat
> > > interval)
> > > > > >         for index, value in enumerate(stored_state(
> > message.value)):
> > > > > >             do_something(value)
> > > > > >             if index % 10000 == 0:
> > > > > >                 consumer.client.poll()
> > > > > >
> > > > > > Is calling KafkaClient.poll() like this safe to do? The
> > documentation
> > > > for
> > > > > > KafkaConsumer.poll() says it is incompatible with the iterator
> > > > interface
> > > > > > but nothing like that is in KafkaClient.poll() documentation.
> > > > > >
> > > > > > Also, there is KafkaConsumer.pause(). Do I need to pause the
> > > partitions
> > > > > I'm
> > > > > > fetching from before calling consumer.client.poll()? Based on
> what
> > I
> > > > have
> > > > > > seen in the code it looks like calling pause() will discard the
> > > > buffered
> > > > > > messages fetched for that partition so far and then fetch them
> > again
> > > > when
> > > > > > calling resume(). Is that correct? In this case I'd rather not
> call
> > > > > pause()
> > > > > > if it is not necessary.
> > > > > >
> > > > > > Thanks for clarification.
> > > > > >
> > > > > > Best Regards,
> > > > > > Martin
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang

Reply via email to