Hi, thank you both for responses.
I forgot to mention which python client I'm using, so let me fix that first: Currently I'm using https://github.com/dpkp/kafka-python As Jeff mentioned, if there is a message that takes too long to process, the group will rebalance and it will affect the whole consumer group. In my particular case I'm able to call some function periodically during the long processing of that message, so I thought that I could just call into the Kafka client to do its I/O in my thread. If that was possible I would not need to set session timeout higher (so that crashes get detected quickly, while also allowing longer time to process that single type of message). As far as I understand, calling KafkaClient.poll() manually should send any pending heartbeats, but I wanted to check whether it might put the consumer into an unexpected state or not. Are there any things that might break if I call KafkaClient.poll() (not to be confused with KafkaConsumer.poll() which I don't intend to call there) in the middle of processing a message? For example, could a commit be sent for that particular message in this case? If the above approach would not work, I'll probably need to increase the session timeout. Martin On Tue, Feb 21, 2017 at 11:13 PM, Jeff Widman <j...@netskope.com> wrote: > Yes, in the linked issues in my email it shows both projects plan to add > support for it. > > The problem with a high session.timeout.ms is if you've got a message that > is locking your consumer in a perpetual processing cycle, then the consumer > will timeout of the group w/o rejoining. So then another consumer in your > group can get locked. Pretty quickly your whole consumer group can be down > due to that single message. > > If instead you've got a decoupled heartbeat, then your stuck consumer won't > drop out of the group, so no rebalance will be triggered, and the other > consumers will happily continue consuming their other partitions. > > While neither is a great solution, I'd rather have the latter as at least > some of your partitions are still working fine rather than bringing your > entire pipeline to a complete halt. > > > > > > On Tue, Feb 21, 2017 at 1:50 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Jeff, > > > > You are right that currently kafka-python does not expose two configs, > i.e. > > session.timeout.ms and max.poll.timeout.ms as the Java client does, but > I > > think the current setting may be sufficient to tackle Martin's issue > alone > > as long as session.timeout.ms can be tuned to be large enough; like you > > said the two configs are for separating the cases between "hard failures > > detected by heartbeat" and "soft failures like long GC / blocked on IO / > > simply processing a record takes long time". > > > > BTW I believe fully supporting KIP-62 is on the roadmap of kafka-python > > already, and maybe Magnus (cc'ed) can explain a bit more on the timeline > of > > it. > > > > > > Guozhang > > > > > > On Tue, Feb 21, 2017 at 12:06 PM, Jeff Widman <j...@netskope.com> wrote: > > > > > As far as I understood it, the primary thrust of KIP-62 was making it > so > > > heartbeats could be issued outside of the poll() loop, meaning that the > > > session.timeout.ms could be reduced below the length of time it takes > a > > > consumer to process a particular batch of messages. > > > > > > Unfortunately, while both librdkafka (which confluent-kafka-python > relies > > > on under the covers) and kafka-python support issuing heartbeats from a > > > background thread, they both currently still tie that heartbeat to the > > > poll() call. So the developer still has to manually tune > max.poll.records > > > and session.timeout.ms, versus once this background heartbeating is > > > decoupling from polling, the defaults should be fine for more > use-cases. > > > > > > I actually filed tickets against both projects a few weeks ago for > this: > > > https://github.com/edenhill/librdkafka/issues/1039 > > > https://github.com/dpkp/kafka-python/issues/948 > > > > > > > > > > > > On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > Hi Martin, > > > > > > > > Since 0.10.1 KIP-62 has been added to consumer client, so that the > user > > > > does not need to manually call pause / resume. > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > > > > > > > > As for python client, as far as I know this background thread > approach > > > has > > > > also been adopted in the Confluent's open source kafka-python client > as > > > > well: > > > > > > > > https://github.com/confluentinc/confluent-kafka-python > > > > > > > > So that as long as you are willing to set a `session.timeout.ms` > high > > > > enough to cover the maximum processing latency of a single record, > the > > > > background thread will be responsible for sending heartbeats and > hence > > > > users do not need to worry about them. > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com> > > > wrote: > > > > > > > > > Hello, > > > > > > > > > > I'm processing messages using kafka-python from a single topic, but > > > > > ocassionally I have a message that might take a while to process, > so > > > I'd > > > > > like to send heartbeats while processing the message. > > > > > > > > > > My program works something like this: > > > > > > > > > > consumer = KafkaConsumer(...) > > > > > for message in consumer: > > > > > if should_store(message.value): > > > > > store_state(message.value) > > > > > elif should_process(message.value): > > > > > # This loop might take a while (longer than heartbeat > > interval) > > > > > for value in stored_state(message.value): > > > > > do_something(value) > > > > > > > > > > I think I need to call consumer.client.poll() at regular intervals > to > > > > send > > > > > the heartbeats, so the code would look like this: > > > > > > > > > > consumer = KafkaConsumer(...) > > > > > for message in consumer: > > > > > if should_store(message.value): > > > > > store_state(message.value) > > > > > elif should_process(message.value): > > > > > # This loop might take a while (longer that heartbeat > > interval) > > > > > for index, value in enumerate(stored_state( > message.value)): > > > > > do_something(value) > > > > > if index % 10000 == 0: > > > > > consumer.client.poll() > > > > > > > > > > Is calling KafkaClient.poll() like this safe to do? The > documentation > > > for > > > > > KafkaConsumer.poll() says it is incompatible with the iterator > > > interface > > > > > but nothing like that is in KafkaClient.poll() documentation. > > > > > > > > > > Also, there is KafkaConsumer.pause(). Do I need to pause the > > partitions > > > > I'm > > > > > fetching from before calling consumer.client.poll()? Based on what > I > > > have > > > > > seen in the code it looks like calling pause() will discard the > > > buffered > > > > > messages fetched for that partition so far and then fetch them > again > > > when > > > > > calling resume(). Is that correct? In this case I'd rather not call > > > > pause() > > > > > if it is not necessary. > > > > > > > > > > Thanks for clarification. > > > > > > > > > > Best Regards, > > > > > Martin > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > >