Hi All,

I am hosting kafka consumers inside microservice hosted as kubernetes pods,
3 consumers in a consumer group.
There is a requirement to add auto-scaling where there will be a single pod
which will be auto-scaled out or scaled-in based on the load on
microservice.
So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be
scaled down to 2 or 1 pod.

Currently, I am using enabled.auto.commit set to 'true' in the consumers
and during scale out or scale-in, i want to commit offset of polled and
processed records so duplicates won't occur.
I have narrowed the problem to 2 scenarios:

1. During scale-down operation, I am adding a shutdown hook to the Java
Runtime, and calling close on the consumer. As per kafka docs, close
provides 30 sec to commit current offsets if auto.commit is enabled: so, i
assume that it will process the current batch of polled records within 30
sec timeout before committing offsets and then close the consumer. Is my
understanding correct?

public void close()

Close the consumer, waiting for up to the default timeout of 30 seconds for
any needed cleanup. If auto-commit is enabled, this will commit the current
offsets if possible within the default timeout. See close(Duration)
<https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration->
for
details. Note that wakeup()
<https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup-->
cannot
be used to interrupt close.

2. During scale out operation, new pod (consumer) will be added to the
consumer group, so partitions of existing consumers will be rebalanced to
new consumer. In this case, I want to ensure that the current batch of
records polled and being processed by the consumer is processed and offsets
are committed before partition rebalance happens to new consumer.
How can I ensure this with auto-commit enabled?

Reply via email to