David Jacot created KAFKA-13266:
-----------------------------------

             Summary: `InitialFetchState` should be created after partition is 
removed from the fetchers
                 Key: KAFKA-13266
                 URL: https://issues.apache.org/jira/browse/KAFKA-13266
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 3.0.0
            Reporter: David Jacot
            Assignee: David Jacot


 

`ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes 
fails with the following error in the log:
{noformat}
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=0] Unexpected error occurred while processing data for partition 
__consumer_offsets-1 at offset 31727 
(kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset 
mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end 
offset = 31728. at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
 at scala.Option.foreach(Option.scala:437) at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
 at 
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
 at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
 at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
 at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
 at scala.Option.foreach(Option.scala:437) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 
11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] 
Partition __consumer_offsets-1 marked as failed 
(kafka.server.ReplicaFetcherThread)
{noformat}
 

The issue is due to a race condition in 
`ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created 
and populated before the partition is removed from the fetcher threads. This 
means that the fetch offset of the `InitialFetchState` could be outdated when 
the fetcher threads are re-started because the fetcher threads could have 
incremented the log end offset in between.

The partitions must be removed from the fetcher threads before the 
`InitialFetchStates` are created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to