Hello,

I'm processing messages using kafka-python from a single topic, but
ocassionally I have a message that might take a while to process, so I'd
like to send heartbeats while processing the message.

My program works something like this:

consumer = KafkaConsumer(...)
for message in consumer:
    if should_store(message.value):
        store_state(message.value)
    elif should_process(message.value):
        # This loop might take a while (longer than heartbeat interval)
        for value in stored_state(message.value):
            do_something(value)

I think I need to call consumer.client.poll() at regular intervals to send
the heartbeats, so the code would look like this:

consumer = KafkaConsumer(...)
for message in consumer:
    if should_store(message.value):
        store_state(message.value)
    elif should_process(message.value):
        # This loop might take a while (longer that heartbeat interval)
        for index, value in enumerate(stored_state(message.value)):
            do_something(value)
            if index % 10000 == 0:
                consumer.client.poll()

Is calling KafkaClient.poll() like this safe to do? The documentation for
KafkaConsumer.poll() says it is incompatible with the iterator interface
but nothing like that is in KafkaClient.poll() documentation.

Also, there is KafkaConsumer.pause(). Do I need to pause the partitions I'm
fetching from before calling consumer.client.poll()? Based on what I have
seen in the code it looks like calling pause() will discard the buffered
messages fetched for that partition so far and then fetch them again when
calling resume(). Is that correct? In this case I'd rather not call pause()
if it is not necessary.

Thanks for clarification.

Best Regards,
Martin

Reply via email to