Hello Jeff, You are right that currently kafka-python does not expose two configs, i.e. session.timeout.ms and max.poll.timeout.ms as the Java client does, but I think the current setting may be sufficient to tackle Martin's issue alone as long as session.timeout.ms can be tuned to be large enough; like you said the two configs are for separating the cases between "hard failures detected by heartbeat" and "soft failures like long GC / blocked on IO / simply processing a record takes long time".
BTW I believe fully supporting KIP-62 is on the roadmap of kafka-python already, and maybe Magnus (cc'ed) can explain a bit more on the timeline of it. Guozhang On Tue, Feb 21, 2017 at 12:06 PM, Jeff Widman <j...@netskope.com> wrote: > As far as I understood it, the primary thrust of KIP-62 was making it so > heartbeats could be issued outside of the poll() loop, meaning that the > session.timeout.ms could be reduced below the length of time it takes a > consumer to process a particular batch of messages. > > Unfortunately, while both librdkafka (which confluent-kafka-python relies > on under the covers) and kafka-python support issuing heartbeats from a > background thread, they both currently still tie that heartbeat to the > poll() call. So the developer still has to manually tune max.poll.records > and session.timeout.ms, versus once this background heartbeating is > decoupling from polling, the defaults should be fine for more use-cases. > > I actually filed tickets against both projects a few weeks ago for this: > https://github.com/edenhill/librdkafka/issues/1039 > https://github.com/dpkp/kafka-python/issues/948 > > > > On Tue, Feb 21, 2017 at 11:01 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Hi Martin, > > > > Since 0.10.1 KIP-62 has been added to consumer client, so that the user > > does not need to manually call pause / resume. > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > > > > As for python client, as far as I know this background thread approach > has > > also been adopted in the Confluent's open source kafka-python client as > > well: > > > > https://github.com/confluentinc/confluent-kafka-python > > > > So that as long as you are willing to set a `session.timeout.ms` high > > enough to cover the maximum processing latency of a single record, the > > background thread will be responsible for sending heartbeats and hence > > users do not need to worry about them. > > > > Guozhang > > > > > > On Tue, Feb 21, 2017 at 7:24 AM, Martin Sucha <msu...@exponea.com> > wrote: > > > > > Hello, > > > > > > I'm processing messages using kafka-python from a single topic, but > > > ocassionally I have a message that might take a while to process, so > I'd > > > like to send heartbeats while processing the message. > > > > > > My program works something like this: > > > > > > consumer = KafkaConsumer(...) > > > for message in consumer: > > > if should_store(message.value): > > > store_state(message.value) > > > elif should_process(message.value): > > > # This loop might take a while (longer than heartbeat interval) > > > for value in stored_state(message.value): > > > do_something(value) > > > > > > I think I need to call consumer.client.poll() at regular intervals to > > send > > > the heartbeats, so the code would look like this: > > > > > > consumer = KafkaConsumer(...) > > > for message in consumer: > > > if should_store(message.value): > > > store_state(message.value) > > > elif should_process(message.value): > > > # This loop might take a while (longer that heartbeat interval) > > > for index, value in enumerate(stored_state(message.value)): > > > do_something(value) > > > if index % 10000 == 0: > > > consumer.client.poll() > > > > > > Is calling KafkaClient.poll() like this safe to do? The documentation > for > > > KafkaConsumer.poll() says it is incompatible with the iterator > interface > > > but nothing like that is in KafkaClient.poll() documentation. > > > > > > Also, there is KafkaConsumer.pause(). Do I need to pause the partitions > > I'm > > > fetching from before calling consumer.client.poll()? Based on what I > have > > > seen in the code it looks like calling pause() will discard the > buffered > > > messages fetched for that partition so far and then fetch them again > when > > > calling resume(). Is that correct? In this case I'd rather not call > > pause() > > > if it is not necessary. > > > > > > Thanks for clarification. > > > > > > Best Regards, > > > Martin > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang