Martin, If you called, for example, commitAsync() before you call client.poll() then the commit request will be sent during that period.
Guozhang On Wed, Feb 22, 2017 at 2:04 AM, Martin Sucha <msu...@exponea.com> wrote: > Hi, > > thank you both for responses. > > I forgot to mention which python client I'm using, so let me fix that > first: > Currently I'm using https://github.com/dpkp/kafka-python > > As Jeff mentioned, if there is a message that takes too long to process, > the group will rebalance and it will affect the whole consumer group. > > In my particular case I'm able to call some function periodically during > the long processing of that message, so I thought that I could just call > into > the Kafka client to do its I/O in my thread. If that was possible I would > not > need to set session timeout higher (so that crashes get detected quickly, > while also allowing longer time to process that single type of message). > > As far as I understand, calling KafkaClient.poll() manually should send any > pending heartbeats, but I wanted to check whether it might put the consumer > into an unexpected state or not. Are there any things that might break > if I call KafkaClient.poll() (not to be confused with KafkaConsumer.poll() > which > I don't intend to call there) in the middle of processing a message? > For example, could a commit be sent for that particular message in this > case? > > If the above approach would not work, I'll probably need to increase the > session timeout. > > Martin > > > On Tue, Feb 21, 2017 at 11:13 PM, Jeff Widman <j...@netskope.com> wrote: > > > Yes, in the linked issues in my email it shows both projects plan to add > > support for it. > > > > The problem with a high session.timeout.ms is if you've got a message > that > > is locking your consumer in a perpetual processing cycle, then the > consumer > > will timeout of the group w/o rejoining. So then another consumer in your > > group can get locked. Pretty quickly your whole consumer group can be > down > > due to that single message. > > > > If instead you've got a decoupled heartbeat, then your stuck consumer > won't > > drop out of the group, so no rebalance will be triggered, and the other > > consumers will happily continue consuming their other partitions. > > > > While neither is a great solution, I'd rather have the latter as at least > > some of your partitions are still working fine rather than bringing your > > entire pipeline to a complete halt. > > > > > > > > > > > > On Tue, Feb 21, 2017 at 1:50 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Jeff, > > > > > > You are right that currently kafka-python does not expose two configs, > > i.e. > > > session.timeout.ms and max.poll.timeout.ms as the Java client does, > but > > I > > > think the current setting may be sufficient to tackle Martin's issue > > alone > > > as long as session.timeout.ms can be tuned to be large enough; like > you > > > said the two configs are for separating the cases between "hard > failures > > > detected by heartbeat" and "soft failures like long GC / blocked on IO > / > > > simply processing a record takes long time". > > > > > > BTW I believe fully supporting KIP-62 is on the roadmap of kafka-python > > > already, and maybe Magnus (cc'ed) can explain a bit more on the > timeline > > of > > > it. > > > > > > > > > Guozhang > > > > > > > > > On Tue, Feb 21, 2017 at 12:06 PM, Jeff Widman <j...@netskope.com> > wrote: > > > > > > > As far as I understood it, the primary thrust of KIP-62 was making it > > so > > > > heartbeats could be issued outside of the poll() loop, meaning that > the > > > > session.timeout.ms could be reduced below the length of time it > takes > > a > > > > consumer to process a particular batch of messages. > > > > > > > > Unfortunately, while both librdkafka (which confluent-kafka-python > > relies > > > > on under the covers) and kafka-python support issuing heartbeats > from a > > > > background thread, they both currently still tie that heartbeat to > the > > > > poll() call. So the developer still has to manually tune > > max.poll.records > > > > and session.timeout.ms, versus once this background heartbeating is > > > > decoupling from polling, the defaults should be fine for more > > use-cases. > > > > > > > > I actually filed tickets against both projects a few weeks ago for > > this: > > > > https://github.com/edenhill/librdkafka/issues/1039 > > > > https://github.com/dpkp/kafka-python/issues/948 > > > > > > > > > > > > > > > > On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > Hi Martin, > > > > > > > > > > Since 0.10.1 KIP-62 has been added to consumer client, so that the > > user > > > > > does not need to manually call pause / resume. > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > > > > > > > > > > As for python client, as far as I know this background thread > > approach > > > > has > > > > > also been adopted in the Confluent's open source kafka-python > client > > as > > > > > well: > > > > > > > > > > https://github.com/confluentinc/confluent-kafka-python > > > > > > > > > > So that as long as you are willing to set a `session.timeout.ms` > > high > > > > > enough to cover the maximum processing latency of a single record, > > the > > > > > background thread will be responsible for sending heartbeats and > > hence > > > > > users do not need to worry about them. > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com> > > > > wrote: > > > > > > > > > > > Hello, > > > > > > > > > > > > I'm processing messages using kafka-python from a single topic, > but > > > > > > ocassionally I have a message that might take a while to process, > > so > > > > I'd > > > > > > like to send heartbeats while processing the message. > > > > > > > > > > > > My program works something like this: > > > > > > > > > > > > consumer = KafkaConsumer(...) > > > > > > for message in consumer: > > > > > > if should_store(message.value): > > > > > > store_state(message.value) > > > > > > elif should_process(message.value): > > > > > > # This loop might take a while (longer than heartbeat > > > interval) > > > > > > for value in stored_state(message.value): > > > > > > do_something(value) > > > > > > > > > > > > I think I need to call consumer.client.poll() at regular > intervals > > to > > > > > send > > > > > > the heartbeats, so the code would look like this: > > > > > > > > > > > > consumer = KafkaConsumer(...) > > > > > > for message in consumer: > > > > > > if should_store(message.value): > > > > > > store_state(message.value) > > > > > > elif should_process(message.value): > > > > > > # This loop might take a while (longer that heartbeat > > > interval) > > > > > > for index, value in enumerate(stored_state( > > message.value)): > > > > > > do_something(value) > > > > > > if index % 10000 == 0: > > > > > > consumer.client.poll() > > > > > > > > > > > > Is calling KafkaClient.poll() like this safe to do? The > > documentation > > > > for > > > > > > KafkaConsumer.poll() says it is incompatible with the iterator > > > > interface > > > > > > but nothing like that is in KafkaClient.poll() documentation. > > > > > > > > > > > > Also, there is KafkaConsumer.pause(). Do I need to pause the > > > partitions > > > > > I'm > > > > > > fetching from before calling consumer.client.poll()? Based on > what > > I > > > > have > > > > > > seen in the code it looks like calling pause() will discard the > > > > buffered > > > > > > messages fetched for that partition so far and then fetch them > > again > > > > when > > > > > > calling resume(). Is that correct? In this case I'd rather not > call > > > > > pause() > > > > > > if it is not necessary. > > > > > > > > > > > > Thanks for clarification. > > > > > > > > > > > > Best Regards, > > > > > > Martin > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > -- -- Guozhang