[
https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Jacot updated KAFKA-13266:
--------------------------------
Description:
`ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes
fails with the following error in the log:
{noformat}
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2,
fetcherId=0] Unexpected error occurred while processing data for partition
__consumer_offsets-1 at offset 31727
(kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset
mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end
offset = 31728. at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
at scala.Option.foreach(Option.scala:437) at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
at
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
at
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
at scala.Option.foreach(Option.scala:437) at
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31
11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0]
Partition __consumer_offsets-1 marked as failed
(kafka.server.ReplicaFetcherThread)
{noformat}
The issue is due to a race condition in
`ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created
and populated before the partition is removed from the fetcher threads. This
means that the fetch offset of the `InitialFetchState` could be outdated when
the fetcher threads are re-started because the fetcher threads could have
incremented the log end offset in between.
The partitions must be removed from the fetcher threads before the
`InitialFetchStates` are created.
was:
`ReplicationTest.test_replication_with_broker_failure` in KRaft mode sometimes
fails with the following error in the log:
{noformat}
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2,
fetcherId=0] Unexpected error occurred while processing data for partition
__consumer_offsets-1 at offset 31727
(kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset
mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end
offset = 31728. at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
at scala.Option.foreach(Option.scala:437) at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
at
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
at
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
at scala.Option.foreach(Option.scala:437) at
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31
11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0]
Partition __consumer_offsets-1 marked as failed
(kafka.server.ReplicaFetcherThread)
{noformat}
The issue is due to a race condition in
`ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created
and populated before the partition is removed from the fetcher threads. This
means that the fetch offset of the `InitialFetchState` could be outdated when
the fetcher threads are re-started because the fetcher threads could have
incremented the log end offset in between.
The partitions must be removed from the fetcher threads before the
`InitialFetchStates` are created.
> `InitialFetchState` should be created after partition is removed from the
> fetchers
> ----------------------------------------------------------------------------------
>
> Key: KAFKA-13266
> URL: https://issues.apache.org/jira/browse/KAFKA-13266
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 3.0.0
> Reporter: David Jacot
> Assignee: David Jacot
> Priority: Critical
>
> `ReplicationTest.test_replication_with_broker_failure` in KRaft mode
> sometimes fails with the following error in the log:
> {noformat}
> [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2,
> fetcherId=0] Unexpected error occurred while processing data for partition
> __consumer_offsets-1 at offset 31727
> (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset
> mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end
> offset = 31728. at
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
> at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
> at scala.Option.foreach(Option.scala:437) at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
> at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
> at
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
> at
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
> at
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
> at
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
> at
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
> at
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
> at scala.Option.foreach(Option.scala:437) at
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31
> 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0]
> Partition __consumer_offsets-1 marked as failed
> (kafka.server.ReplicaFetcherThread)
> {noformat}
> The issue is due to a race condition in
> `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created
> and populated before the partition is removed from the fetcher threads. This
> means that the fetch offset of the `InitialFetchState` could be outdated when
> the fetcher threads are re-started because the fetcher threads could have
> incremented the log end offset in between.
> The partitions must be removed from the fetcher threads before the
> `InitialFetchStates` are created.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)