For reference here's teh complete stacktrace (it's triggered when
commitSync gets called):
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:499)
~[kafka-clients-0.10.1.0.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.*commitSync*(KafkaConsumer.java:1104)
~[kafka-clients-0.10.1.0.jar!/:na]
-Jaikiran
On Tuesday 01 November 2016 07:39 PM, Jaikiran Pai wrote:
We are using Kafka 0.10.1.0 (server) and Java client API (the new API)
for consumers. One of the issues we have been running into is that the
consumer is considered "dead" by the co-ordinator because of the lack
of activity within a specific period of time. In reality, the consumer
is still alive. We see exceptions like these:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
I understand what that exception means and what we could potentially
do to address that (setting a low value for max.poll.records is one
option). Before changing the max.poll.records value in our setup, I
would like to hear/understand a bit more about this so that I know
this is a right way to fix in the way we have implemented our
consumers. Essentially, our consumer code is this:
while (!stopped) {
try {
final ConsumerRecords<K, V> consumerRecords =
consumer.poll(someValue);
for (final TopicPartition topicPartition :
consumerRecords.partitions()) {
if (stopped) {
break;
}
for (final ConsumerRecord<K, V> consumerRecord
: consumerRecords.records(topicPartition)) {
final long previousOffset =
consumerRecord.offset();
// commit the offset and then pass on the
message for processing (in a separate thread)
consumer.commitSync(Collections.singletonMap(topicPartition, new
OffsetAndMetadata(previousOffset + 1)));
this.executor.execute(new Runnable() {
@Override
public void run() {
// process the ConsumerRecord
}
});
}
}
} catch (Exception e) {
// log the error and continue
continue;
}
}
As you can see the only thing that happens in the main thread which
the consumer is polling on is - commitSync for each record that was
returned in that batch of poll. I understand commitSync is blocking,
so potentially this can lead to each commitSync invocation adding up
to the time between each poll(). One option is using commitAsync, but
we need to evaluate if it has other issues within our usecase.
But what I was wondering was, why doesn't commitSync contribute to the
logic of the consumer being alive? If it did, then I see no reason why
this consumer will ever be considered dead and that above message
logged. Anyone see a problem with the code above?
P.S: We use the default session timeout value in the consumer configs
(i.e. we don't set any specific value)
-Jaikiran