Hello, I'm processing messages using kafka-python from a single topic, but ocassionally I have a message that might take a while to process, so I'd like to send heartbeats while processing the message.
My program works something like this: consumer = KafkaConsumer(...) for message in consumer: if should_store(message.value): store_state(message.value) elif should_process(message.value): # This loop might take a while (longer than heartbeat interval) for value in stored_state(message.value): do_something(value) I think I need to call consumer.client.poll() at regular intervals to send the heartbeats, so the code would look like this: consumer = KafkaConsumer(...) for message in consumer: if should_store(message.value): store_state(message.value) elif should_process(message.value): # This loop might take a while (longer that heartbeat interval) for index, value in enumerate(stored_state(message.value)): do_something(value) if index % 10000 == 0: consumer.client.poll() Is calling KafkaClient.poll() like this safe to do? The documentation for KafkaConsumer.poll() says it is incompatible with the iterator interface but nothing like that is in KafkaClient.poll() documentation. Also, there is KafkaConsumer.pause(). Do I need to pause the partitions I'm fetching from before calling consumer.client.poll()? Based on what I have seen in the code it looks like calling pause() will discard the buffered messages fetched for that partition so far and then fetch them again when calling resume(). Is that correct? In this case I'd rather not call pause() if it is not necessary. Thanks for clarification. Best Regards, Martin