We are using Kafka 0.10.1.0 (server) and Java client API (the new API) for consumers. One of the issues we have been running into is that the consumer is considered "dead" by the co-ordinator because of the lack of activity within a specific period of time. In reality, the consumer is still alive. We see exceptions like these:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.


I understand what that exception means and what we could potentially do to address that (setting a low value for max.poll.records is one option). Before changing the max.poll.records value in our setup, I would like to hear/understand a bit more about this so that I know this is a right way to fix in the way we have implemented our consumers. Essentially, our consumer code is this:

            while (!stopped) {
                try {
final ConsumerRecords<K, V> consumerRecords = consumer.poll(someValue); for (final TopicPartition topicPartition : consumerRecords.partitions()) {
                        if (stopped) {
                            break;
                        }
for (final ConsumerRecord<K, V> consumerRecord : consumerRecords.records(topicPartition)) { final long previousOffset = consumerRecord.offset(); // commit the offset and then pass on the message for processing (in a separate thread) consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(previousOffset + 1)));

                            this.executor.execute(new Runnable() {
                                @Override
                                public void run() {
                                    // process the ConsumerRecord
                                }
                            });
                        }
                    }
                } catch (Exception e) {
                    // log the error and continue
                    continue;
                }
            }



As you can see the only thing that happens in the main thread which the consumer is polling on is - commitSync for each record that was returned in that batch of poll. I understand commitSync is blocking, so potentially this can lead to each commitSync invocation adding up to the time between each poll(). One option is using commitAsync, but we need to evaluate if it has other issues within our usecase.

But what I was wondering was, why doesn't commitSync contribute to the logic of the consumer being alive? If it did, then I see no reason why this consumer will ever be considered dead and that above message logged. Anyone see a problem with the code above?

P.S: We use the default session timeout value in the consumer configs (i.e. we don't set any specific value)


-Jaikiran

Reply via email to