[
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16595357#comment-16595357
]
ASF GitHub Bot commented on KAFKA-7128:
---------------------------------------
hachikuji closed pull request #5557: KAFKA-7128: Follower has to catch up to
offset within current leader epoch to join ISR
URL: https://github.com/apache/kafka/pull/5557
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index a92340f2a4b..22c1508bb8c 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -62,6 +62,9 @@ class Partition(val topic: String,
private val leaderIsrUpdateLock = new ReentrantReadWriteLock
private var zkVersion: Int = LeaderAndIsr.initialZKVersion
@volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
+ // start offset for 'leaderEpoch' above (leader epoch of the current leader
for this partition),
+ // defined when this broker is leader for partition
+ @volatile private var leaderEpochStartOffsetOpt: Option[Long] = None
@volatile var leaderReplicaIdOpt: Option[Int] = None
@volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
@@ -263,6 +266,7 @@ class Partition(val topic: String,
allReplicasMap.clear()
inSyncReplicas = Set.empty[Replica]
leaderReplicaIdOpt = None
+ leaderEpochStartOffsetOpt = None
removePartitionMetrics()
logManager.asyncDelete(topicPartition)
logManager.asyncDelete(topicPartition, isFuture = true)
@@ -287,18 +291,19 @@ class Partition(val topic: String,
// remove assigned replicas that have been removed by the controller
(assignedReplicas.map(_.brokerId) --
newAssignedReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas
+ newAssignedReplicas.foreach(id => getOrCreateReplica(id,
partitionStateInfo.isNew))
+ val leaderReplica = getReplica().get
+ val leaderEpochStartOffset = leaderReplica.logEndOffset.messageOffset
info(s"$topicPartition starts at Leader Epoch
${partitionStateInfo.basePartitionState.leaderEpoch} from " +
- s"offset ${getReplica().get.logEndOffset.messageOffset}. Previous
Leader Epoch was: $leaderEpoch")
+ s"offset $leaderEpochStartOffset. Previous Leader Epoch was:
$leaderEpoch")
//We cache the leader epoch here, persisting it only if it's local
(hence having a log dir)
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
- newAssignedReplicas.foreach(id => getOrCreateReplica(id,
partitionStateInfo.isNew))
-
+ leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
zkVersion = partitionStateInfo.basePartitionState.zkVersion
val isNewLeader = leaderReplicaIdOpt.map(_ !=
localBrokerId).getOrElse(true)
- val leaderReplica = getReplica().get
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curTimeMs = time.milliseconds
// initialize lastCaughtUpTime of replicas as well as their
lastFetchTimeMs and lastFetchLeaderLogEndOffset.
@@ -344,6 +349,7 @@ class Partition(val topic: String,
(assignedReplicas.map(_.brokerId) --
newAssignedReplicas).foreach(removeReplica)
inSyncReplicas = Set.empty[Replica]
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
+ leaderEpochStartOffsetOpt = None
zkVersion = partitionStateInfo.basePartitionState.zkVersion
// If the leader is unchanged and the epochs are no more than one change
apart, indicate that no follower changes are required
@@ -388,7 +394,11 @@ class Partition(val topic: String,
/**
* Check and maybe expand the ISR of the partition.
- * A replica will be added to ISR if its LEO >= current hw of the partition.
+ * A replica will be added to ISR if its LEO >= current hw of the partition
and it is caught up to
+ * an offset within the current leader epoch. A replica must be caught up to
the current leader
+ * epoch before it can join ISR, because otherwise, if there is committed
data between current
+ * leader's HW and LEO, the replica may become the leader before it fetches
the committed data
+ * and the data will be lost.
*
* Technically, a replica shouldn't be in ISR if it hasn't caught up for
longer than replicaLagTimeMaxMs,
* even if its log end offset is >= HW. However, to be consistent with how
the follower determines
@@ -405,9 +415,11 @@ class Partition(val topic: String,
case Some(leaderReplica) =>
val replica = getReplica(replicaId).get
val leaderHW = leaderReplica.highWatermark
+ val fetchOffset =
logReadResult.info.fetchOffsetMetadata.messageOffset
if (!inSyncReplicas.contains(replica) &&
assignedReplicas.map(_.brokerId).contains(replicaId) &&
- replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
+ replica.logEndOffset.offsetDiff(leaderHW) >= 0 &&
+ leaderEpochStartOffsetOpt.exists(fetchOffset >= _)) {
val newInSyncReplicas = inSyncReplicas + replica
info(s"Expanding ISR from
${inSyncReplicas.map(_.brokerId).mkString(",")} " +
s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 96c1147c9bf..343693e82c7 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -27,6 +27,7 @@ import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{LogConfig, LogManager, CleanerConfig}
import kafka.server._
import kafka.utils.{MockTime, TestUtils, MockScheduler}
+import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ReplicaNotAvailableException
import org.apache.kafka.common.metrics.Metrics
@@ -36,6 +37,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.scalatest.Assertions.assertThrows
+import org.easymock.EasyMock
import scala.collection.JavaConverters._
@@ -72,10 +74,16 @@ class PartitionTest {
val brokerProps = TestUtils.createBrokerConfig(brokerId,
TestUtils.MockZkConnect)
brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1,
logDir2).map(_.getAbsolutePath).mkString(","))
val brokerConfig = KafkaConfig.fromProps(brokerProps)
+ val kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
replicaManager = new ReplicaManager(
- config = brokerConfig, metrics, time, zkClient = null, new
MockScheduler(time),
+ config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new
MockScheduler(time),
logManager, new AtomicBoolean(false),
QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
brokerTopicStats, new MetadataCache(brokerId), new
LogDirFailureChannel(brokerConfig.logDirs.size))
+
+ EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(),
EasyMock.anyString())).andReturn(logProps).anyTimes()
+ EasyMock.expect(kafkaZkClient.conditionalUpdatePath(EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
+ .andReturn((true, 0)).anyTimes()
+ EasyMock.replay(kafkaZkClient)
}
@After
@@ -230,6 +238,82 @@ class PartitionTest {
assertFalse(partition.makeFollower(0, partitionStateInfo, 2))
}
+ @Test
+ def
testFollowerDoesNotJoinISRUntilCaughtUpToOffsetWithinCurrentLeaderEpoch(): Unit
= {
+ val controllerEpoch = 3
+ val leader = brokerId
+ val follower1 = brokerId + 1
+ val follower2 = brokerId + 2
+ val controllerId = brokerId + 3
+ val replicas = List[Integer](leader, follower1, follower2).asJava
+ val isr = List[Integer](leader, follower2).asJava
+ val leaderEpoch = 8
+ val batch1 = TestUtils.records(records = List(new
SimpleRecord("k1".getBytes, "v1".getBytes),
+ new
SimpleRecord("k2".getBytes, "v2".getBytes)))
+ val batch2 = TestUtils.records(records = List(new
SimpleRecord("k3".getBytes, "v1".getBytes),
+ new
SimpleRecord("k4".getBytes, "v2".getBytes),
+ new
SimpleRecord("k5".getBytes, "v3".getBytes)))
+ val batch3 = TestUtils.records(records = List(new
SimpleRecord("k6".getBytes, "v1".getBytes),
+ new
SimpleRecord("k7".getBytes, "v2".getBytes)))
+
+ val partition = new Partition(topicPartition.topic,
topicPartition.partition, time, replicaManager)
+ assertTrue("Expected first makeLeader() to return 'leader changed'",
+ partition.makeLeader(controllerId, new
LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr,
1, replicas, true), 0))
+ assertEquals("Current leader epoch", leaderEpoch, partition.getLeaderEpoch)
+ assertEquals("ISR", Set[Integer](leader, follower2),
partition.inSyncReplicas.map(_.brokerId))
+
+ // after makeLeader(() call, partition should know about all the replicas
+ val leaderReplica = partition.getReplica(leader).get
+ val follower1Replica = partition.getReplica(follower1).get
+ val follower2Replica = partition.getReplica(follower2).get
+
+ // append records with initial leader epoch
+ val lastOffsetOfFirstBatch = partition.appendRecordsToLeader(batch1,
isFromClient = true).lastOffset
+ partition.appendRecordsToLeader(batch2, isFromClient = true)
+ assertEquals("Expected leader's HW not move",
leaderReplica.logStartOffset, leaderReplica.highWatermark.messageOffset)
+
+ // let the follower in ISR move leader's HW to move further but below LEO
+ def readResult(fetchInfo: FetchDataInfo, leaderReplica: Replica):
LogReadResult = {
+ LogReadResult(info = fetchInfo,
+ highWatermark = leaderReplica.highWatermark.messageOffset,
+ leaderLogStartOffset = leaderReplica.logStartOffset,
+ leaderLogEndOffset =
leaderReplica.logEndOffset.messageOffset,
+ followerLogStartOffset = 0,
+ fetchTimeMs = time.milliseconds,
+ readSize = 10240,
+ lastStableOffset = None)
+ }
+ partition.updateReplicaLogReadResult(
+ follower2Replica, readResult(FetchDataInfo(LogOffsetMetadata(0),
batch1), leaderReplica))
+ partition.updateReplicaLogReadResult(
+ follower2Replica,
readResult(FetchDataInfo(LogOffsetMetadata(lastOffsetOfFirstBatch), batch2),
leaderReplica))
+ assertEquals("Expected leader's HW", lastOffsetOfFirstBatch,
leaderReplica.highWatermark.messageOffset)
+
+ // current leader becomes follower and then leader again (without any new
records appended)
+ partition.makeFollower(
+ controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch,
follower2, leaderEpoch + 1, isr, 1, replicas, false), 1)
+ assertTrue("Expected makeLeader() to return 'leader changed' after
makeFollower()",
+ partition.makeLeader(controllerEpoch, new
LeaderAndIsrRequest.PartitionState(
+ controllerEpoch, leader, leaderEpoch + 2, isr, 1, replicas,
false), 2))
+ val currentLeaderEpochStartOffset =
leaderReplica.logEndOffset.messageOffset
+
+ // append records with the latest leader epoch
+ partition.appendRecordsToLeader(batch3, isFromClient = true)
+
+ // fetch from follower not in ISR from log start offset should not add
this follower to ISR
+ partition.updateReplicaLogReadResult(follower1Replica,
+
readResult(FetchDataInfo(LogOffsetMetadata(0), batch1), leaderReplica))
+ partition.updateReplicaLogReadResult(follower1Replica,
+
readResult(FetchDataInfo(LogOffsetMetadata(lastOffsetOfFirstBatch), batch2),
leaderReplica))
+ assertEquals("ISR", Set[Integer](leader, follower2),
partition.inSyncReplicas.map(_.brokerId))
+
+ // fetch from the follower not in ISR from start offset of the current
leader epoch should
+ // add this follower to ISR
+ partition.updateReplicaLogReadResult(follower1Replica,
+
readResult(FetchDataInfo(LogOffsetMetadata(currentLeaderEpochStartOffset),
batch3), leaderReplica))
+ assertEquals("ISR", Set[Integer](leader, follower1, follower2),
partition.inSyncReplicas.map(_.brokerId))
+ }
+
def createRecords(records: Iterable[SimpleRecord], baseOffset: Long,
partitionLeaderEpoch: Int = 0): MemoryRecords = {
val buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Lagging high watermark can lead to committed data loss after ISR expansion
> --------------------------------------------------------------------------
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
> Issue Type: Bug
> Reporter: Jason Gustafson
> Assignee: Anna Povzner
> Priority: Major
>
> Some model checking exposed a weakness in the ISR expansion logic. We know
> that the high watermark can go backwards after a leader failover, but we may
> not have known that this can lead to the loss of committed data.
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun
> fetching. The data up to offset 10 has been committed to the ISR. Here is the
> initial state:
> State 1
> ISR: (r1, r2)
> Leader: r1
> r1: [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes
> r2 the new leader. The high watermark is still lagging r1.
> State 2
> ISR: (r2)
> Leader: r2
> r1 (offline): [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=0]
> Replica 3 then catch up to the high watermark on r2 and joins the ISR.
> Perhaps it's high watermark is lagging behind r2, but this is unimportant.
> State 3
> ISR: (r2, r3)
> Leader: r2
> r1 (offline): [hw=10, leo=10]
> r2: [hw=5, leo=10]
> r3: [hw=0, leo=5]
> Now r2 fails and r3 is elected leader and is the only member of the ISR. The
> committed data from offsets 5 to 10 has been lost.
> State 4
> ISR: (r3)
> Leader: r3
> r1 (offline): [hw=10, leo=10]
> r2 (offline): [hw=5, leo=10]
> r3: [hw=0, leo=5]
> The bug is the fact that we allowed r3 into the ISR after the local high
> watermark had been reached. Since the follower does not know the true high
> watermark for the previous leader's epoch, it should not allow a replica to
> join the ISR until it has caught up to an offset within its own epoch.
> Note this is related to
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)