David Jacot created KAFKA-13266:
-----------------------------------
Summary: `InitialFetchState` should be created after partition is
removed from the fetchers
Key: KAFKA-13266
URL: https://issues.apache.org/jira/browse/KAFKA-13266
Project: Kafka
Issue Type: Bug
Affects Versions: 3.0.0
Reporter: David Jacot
Assignee: David Jacot
`ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes
fails with the following error in the log:
{noformat}
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2,
fetcherId=0] Unexpected error occurred while processing data for partition
__consumer_offsets-1 at offset 31727
(kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset
mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end
offset = 31728. at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
at scala.Option.foreach(Option.scala:437) at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
at
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
at
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
at scala.Option.foreach(Option.scala:437) at
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31
11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0]
Partition __consumer_offsets-1 marked as failed
(kafka.server.ReplicaFetcherThread)
{noformat}
The issue is due to a race condition in
`ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created
and populated before the partition is removed from the fetcher threads. This
means that the fetch offset of the `InitialFetchState` could be outdated when
the fetcher threads are re-started because the fetcher threads could have
incremented the log end offset in between.
The partitions must be removed from the fetcher threads before the
`InitialFetchStates` are created.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)