Jason Gustafson created KAFKA-10123:
---------------------------------------
Summary: Regression resetting offsets in consumer when fetching
from old broker
Key: KAFKA-10123
URL: https://issues.apache.org/jira/browse/KAFKA-10123
Project: Kafka
Issue Type: Bug
Reporter: Jason Gustafson
Assignee: David Arthur
Fix For: 2.6.0
We saw this error in system tests:
{code}
java.lang.NullPointerException
at
org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111)
at
org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at
kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
{code}
The logs should that the consumer was in the middle of an offset reset when
this happened. We changed the logic in KAFKA-9724 to include the following
check:
{code}
NodeApiVersions nodeApiVersions =
apiVersions.get(leaderAndEpoch.leader.get().idString());
if (nodeApiVersions == null ||
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
} else {
// If the broker does not support a newer version of
OffsetsForLeaderEpoch, we skip validation
completeValidation(tp);
return false;
}
{code}
The problem seems to be the shortcut call to `completeValidation`, which
executes the following logic:
{code}
if (hasPosition()) {
transitionState(FetchStates.FETCHING, () ->
this.nextRetryTimeMs = null);
}
{code}
We should be protected by the call to `hasPosition` here, but in the case of
the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to
enter the `FETCHING` state without a position, which ultimately leads to the
NPE.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)