We are using Kafka 0.10.1.0 (server) and Java client API (the new API)
for consumers. One of the issues we have been running into is that the
consumer is considered "dead" by the co-ordinator because of the lack of
activity within a specific period of time. In reality, the consumer is
still alive. We see exceptions like these:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
I understand what that exception means and what we could potentially do
to address that (setting a low value for max.poll.records is one
option). Before changing the max.poll.records value in our setup, I
would like to hear/understand a bit more about this so that I know this
is a right way to fix in the way we have implemented our consumers.
Essentially, our consumer code is this:
while (!stopped) {
try {
final ConsumerRecords<K, V> consumerRecords =
consumer.poll(someValue);
for (final TopicPartition topicPartition :
consumerRecords.partitions()) {
if (stopped) {
break;
}
for (final ConsumerRecord<K, V> consumerRecord
: consumerRecords.records(topicPartition)) {
final long previousOffset =
consumerRecord.offset();
// commit the offset and then pass on the
message for processing (in a separate thread)
consumer.commitSync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(previousOffset + 1)));
this.executor.execute(new Runnable() {
@Override
public void run() {
// process the ConsumerRecord
}
});
}
}
} catch (Exception e) {
// log the error and continue
continue;
}
}
As you can see the only thing that happens in the main thread which the
consumer is polling on is - commitSync for each record that was returned
in that batch of poll. I understand commitSync is blocking, so
potentially this can lead to each commitSync invocation adding up to the
time between each poll(). One option is using commitAsync, but we need
to evaluate if it has other issues within our usecase.
But what I was wondering was, why doesn't commitSync contribute to the
logic of the consumer being alive? If it did, then I see no reason why
this consumer will ever be considered dead and that above message
logged. Anyone see a problem with the code above?
P.S: We use the default session timeout value in the consumer configs
(i.e. we don't set any specific value)
-Jaikiran