Hi Pushkar, Here's the answer to your questions:
> 1. During scale-down operation, I am adding a shutdown hook to the Java Runtime, and calling close on the consumer. As per kafka docs, close provides 30 sec to commit current offsets if auto.commit is enabled: so, i assume that it will process the current batch of polled records within 30 sec timeout before committing offsets and then close the consumer. Is my understanding correct? No, close() method is only doing some cleanup and offset commit if needed. It won't care if the polled records are processed or not. So, to be clear, the 30 seconds is for consumer to do: (1) commit offset if auto.commit is enabled (2) leave consumer group (3) other cleanup > 2. During scale out operation, new pod (consumer) will be added to the consumer group, so partitions of existing consumers will be rebalanced to new consumer. In this case, I want to ensure that the current batch of records polled and being processed by the consumer is processed and offsets are committed before partition rebalance happens to new consumer. How can I ensure this with auto-commit enabled? It depends on which version of Kafka you're running, and which `partition.assignment.strategy` you are setting. In Kafka v3.2.1, we found a bug that it'll have chance to process duplicate records during rebalance: KAFKA-14196 <https://issues.apache.org/jira/browse/KAFKA-14196> So, assuming you're using default `partition.assignment.strategy` setting, and not in v3.2.1, we can ensure it will not have duplicated consumption. If you set the `partition.assignment.strategy` to cooperativeStickyAssignor, there's a bug that we're still working on: KAFKA-14224 <https://issues.apache.org/jira/browse/KAFKA-14224> Thank you. Luke On Wed, Sep 14, 2022 at 3:09 PM Pushkar Deole <pdeole2...@gmail.com> wrote: > Hi All, > > I am hosting kafka consumers inside microservice hosted as kubernetes pods, > 3 consumers in a consumer group. > There is a requirement to add auto-scaling where there will be a single pod > which will be auto-scaled out or scaled-in based on the load on > microservice. > So, 1 pod can be scaled out to 2 or 3 pods, and similarly 3 pods can be > scaled down to 2 or 1 pod. > > Currently, I am using enabled.auto.commit set to 'true' in the consumers > and during scale out or scale-in, i want to commit offset of polled and > processed records so duplicates won't occur. > I have narrowed the problem to 2 scenarios: > > 1. During scale-down operation, I am adding a shutdown hook to the Java > Runtime, and calling close on the consumer. As per kafka docs, close > provides 30 sec to commit current offsets if auto.commit is enabled: so, i > assume that it will process the current batch of polled records within 30 > sec timeout before committing offsets and then close the consumer. Is my > understanding correct? > > public void close() > > Close the consumer, waiting for up to the default timeout of 30 seconds for > any needed cleanup. If auto-commit is enabled, this will commit the current > offsets if possible within the default timeout. See close(Duration) > < > https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close-java.time.Duration- > > > for > details. Note that wakeup() > < > https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup-- > > > cannot > be used to interrupt close. > > 2. During scale out operation, new pod (consumer) will be added to the > consumer group, so partitions of existing consumers will be rebalanced to > new consumer. In this case, I want to ensure that the current batch of > records polled and being processed by the consumer is processed and offsets > are committed before partition rebalance happens to new consumer. > How can I ensure this with auto-commit enabled? >