[
https://issues.apache.org/jira/browse/KAFKA-7164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577880#comment-16577880
]
ASF GitHub Bot commented on KAFKA-7164:
---------------------------------------
hachikuji closed pull request #5436: KAFKA-7164: Follower should truncate after
every leader epoch change
URL: https://github.com/apache/kafka/pull/5436
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index 154a8f969c5..a92340f2a4b 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -326,12 +326,15 @@ class Partition(val topic: String,
/**
* Make the local replica the follower by setting the new leader and ISR to
empty
- * If the leader replica id does not change, return false to indicate the
replica manager
+ * If the leader replica id does not change and the new epoch is equal or
one
+ * greater (that is, no updates have been missed), return false to indicate
to the
+ * replica manager that state is already correct and the become-follower
steps can be skipped
*/
def makeFollower(controllerId: Int, partitionStateInfo:
LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val newAssignedReplicas =
partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
- val newLeaderBrokerId: Int = partitionStateInfo.basePartitionState.leader
+ val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
+ val oldLeaderEpoch = leaderEpoch
// record the epoch of the controller that made the leadership decision.
This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper
path
controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
@@ -343,7 +346,9 @@ class Partition(val topic: String,
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
zkVersion = partitionStateInfo.basePartitionState.zkVersion
- if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get ==
newLeaderBrokerId) {
+ // If the leader is unchanged and the epochs are no more than one change
apart, indicate that no follower changes are required
+ // Otherwise, we missed a leader epoch update, which means the leader's
log may have been truncated prior to the current epoch.
+ if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && (leaderEpoch ==
oldLeaderEpoch || leaderEpoch == oldLeaderEpoch + 1)) {
false
}
else {
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 41bdefddb76..96c1147c9bf 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -32,9 +32,11 @@ import
org.apache.kafka.common.errors.ReplicaNotAvailableException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.record._
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.scalatest.Assertions.assertThrows
+
import scala.collection.JavaConverters._
class PartitionTest {
@@ -207,6 +209,27 @@ class PartitionTest {
}
}
+ @Test
+ def testMakeFollowerWithNoLeaderIdChange(): Unit = {
+ val partition = new Partition(topicPartition.topic,
topicPartition.partition, time, replicaManager)
+
+ // Start off as follower
+ var partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 1,
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+ partition.makeFollower(0, partitionStateInfo, 0)
+
+ // Request with same leader and epoch increases by more than 1, perform
become-follower steps
+ partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 3,
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+ assertTrue(partition.makeFollower(0, partitionStateInfo, 1))
+
+ // Request with same leader and epoch increases by only 1, skip
become-follower steps
+ partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4,
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+ assertFalse(partition.makeFollower(0, partitionStateInfo, 2))
+
+ // Request with same leader and same epoch, skip become-follower steps
+ partitionStateInfo = new LeaderAndIsrRequest.PartitionState(0, 1, 4,
List[Integer](0, 1, 2).asJava, 1, List[Integer](0, 1, 2).asJava, false)
+ assertFalse(partition.makeFollower(0, partitionStateInfo, 2))
+ }
+
def createRecords(records: Iterable[SimpleRecord], baseOffset: Long,
partitionLeaderEpoch: Int = 0): MemoryRecords = {
val buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
val builder = MemoryRecords.builder(
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 56d4b7919e4..171bcf3528c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -19,21 +19,26 @@ package kafka.server
import java.io.File
import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.log.LogConfig
+import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker
+import kafka.cluster.BrokerEndPoint
+import kafka.server.epoch.LeaderEpochCache
+import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.timer.MockTimer
import kafka.zk.KafkaZkClient
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest}
+import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel,
LeaderAndIsrRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.zookeeper.data.Stat
import org.easymock.EasyMock
@@ -51,6 +56,11 @@ class ReplicaManagerTest {
var zkClient: ZkClient = _
var kafkaZkClient: KafkaZkClient = _
+ // Constants defined for readability
+ val zkVersion = 0
+ val correlationId = 0
+ var controllerEpoch = 0
+
@Before
def setUp() {
zkClient = EasyMock.createMock(classOf[ZkClient])
@@ -504,6 +514,216 @@ class ReplicaManagerTest {
}
}
+ /**
+ * If a partition becomes a follower and the leader is unchanged it should
check for truncation
+ * if the epoch has increased by more than one (which suggests it has
missed an update)
+ */
+ @Test
+ def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate() {
+ val topicPartition = 0
+ val followerBrokerId = 0
+ val leaderBrokerId = 1
+ val controllerId = 0
+ val controllerEpoch = 0
+ var leaderEpoch = 1
+ val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
+ val countDownLatch = new CountDownLatch(1)
+
+ // Prepare the mocked components for the test
+ val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+ topicPartition, followerBrokerId, leaderBrokerId, countDownLatch,
expectTruncation = true)
+
+ // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
+ val partition = replicaManager.getOrCreatePartition(new
TopicPartition(topic, topicPartition))
+ partition.getOrCreateReplica(followerBrokerId)
+ partition.makeFollower(controllerId,
+ leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
+ correlationId)
+
+ // Make local partition a follower - because epoch increased by more than
1, truncation should
+ // trigger even though leader does not change
+ leaderEpoch += 2
+ val leaderAndIsrRequest0 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+ controllerId, controllerEpoch,
+ collection.immutable.Map(new TopicPartition(topic, topicPartition) ->
+ leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId,
aliveBrokerIds)).asJava,
+ Set(new Node(followerBrokerId, "host1", 0),
+ new Node(leaderBrokerId, "host2", 1)).asJava).build()
+ replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
+ (_, followers) => assertEquals(followerBrokerId,
followers.head.partitionId))
+ assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
+
+ // Truncation should have happened once
+ EasyMock.verify(mockLogMgr)
+ }
+
+ /**
+ * If a partition becomes a follower and the leader is unchanged but no
epoch update
+ * has been missed, it should not check for truncation
+ */
+ @Test
+ def testDontBecomeFollowerWhenNoMissedLeaderUpdate() {
+ val topicPartition = 0
+ val followerBrokerId = 0
+ val leaderBrokerId = 1
+ val controllerId = 0
+ var leaderEpoch = 1
+ val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
+ val countDownLatch = new CountDownLatch(1)
+
+ // Prepare the mocked components for the test
+ val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
+ topicPartition, followerBrokerId, leaderBrokerId, countDownLatch,
expectTruncation = false)
+
+ // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
+ val partition = replicaManager.getOrCreatePartition(new
TopicPartition(topic, topicPartition))
+ partition.getOrCreateReplica(followerBrokerId)
+ partition.makeFollower(controllerId,
+ leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
+ correlationId)
+
+ // Make local partition a follower - because epoch did not change,
truncation should not trigger
+ val leaderAndIsrRequest0 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+ controllerId, controllerEpoch,
+ collection.immutable.Map(new TopicPartition(topic, topicPartition) ->
+ leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId,
aliveBrokerIds)).asJava,
+ Set(new Node(followerBrokerId, "host1", 0),
+ new Node(leaderBrokerId, "host2", 1)).asJava).build()
+ replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest0,
+ (_, followers) => assertTrue(followers.isEmpty))
+
+ // Make local partition a follower - because epoch increased by only 1 and
leader did not change,
+ // truncation should not trigger
+ leaderEpoch += 1
+ val leaderAndIsrRequest1 = new
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
+ controllerId, controllerEpoch,
+ collection.immutable.Map(new TopicPartition(topic, topicPartition) ->
+ leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId,
aliveBrokerIds)).asJava,
+ Set(new Node(followerBrokerId, "host1", 0),
+ new Node(leaderBrokerId, "host2", 1)).asJava).build()
+ replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1,
+ (_, followers) => assertTrue(followers.isEmpty))
+
+ // Truncation should not have happened
+ EasyMock.verify(mockLogMgr)
+ }
+
+ private def prepareReplicaManagerAndLogManager(topicPartition: Int,
+ followerBrokerId: Int,
+ leaderBrokerId: Int,
+ countDownLatch:
CountDownLatch,
+ expectTruncation: Boolean) :
(ReplicaManager, LogManager) = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+ val config = KafkaConfig.fromProps(props)
+
+ // Setup mock local log to have leader epoch of 3 and offset of 10
+ val localLogOffset = 10
+ val offsetFromLeader = 5
+ val leaderEpochFromLeader = 3
+ val mockScheduler = new MockScheduler(time)
+ val mockBrokerTopicStats = new BrokerTopicStats
+ val mockLogDirFailureChannel = new
LogDirFailureChannel(config.logDirs.size)
+ val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
+
EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader)
+ EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
+ .andReturn((leaderEpochFromLeader, localLogOffset))
+ EasyMock.replay(mockLeaderEpochCache)
+ val mockLog = new Log(
+ dir = new File(new File(config.logDirs.head), s"$topic-0"),
+ config = LogConfig(),
+ logStartOffset = 0L,
+ recoveryPoint = 0L,
+ scheduler = mockScheduler,
+ brokerTopicStats = mockBrokerTopicStats,
+ time = time,
+ maxProducerIdExpirationMs = 30000,
+ producerIdExpirationCheckIntervalMs = 30000,
+ topicPartition = new TopicPartition(topic, topicPartition),
+ producerStateManager = new ProducerStateManager(new
TopicPartition(topic, topicPartition),
+ new File(new File(config.logDirs.head), s"$topic-$topicPartition"),
30000),
+ logDirFailureChannel = mockLogDirFailureChannel) {
+
+ override def leaderEpochCache: LeaderEpochCache = mockLeaderEpochCache
+
+ override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset)
+ }
+
+ // Expect to call LogManager.truncateTo exactly once
+ val mockLogMgr = EasyMock.createMock(classOf[LogManager])
+ EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new
File(_).getAbsoluteFile)).anyTimes
+ EasyMock.expect(mockLogMgr.currentDefaultConfig).andReturn(LogConfig())
+ EasyMock.expect(mockLogMgr.getOrCreateLog(new TopicPartition(topic,
topicPartition),
+ LogConfig(), isNew = false, isFuture =
false)).andReturn(mockLog).anyTimes
+ if (expectTruncation) {
+ EasyMock.expect(mockLogMgr.truncateTo(Map(new TopicPartition(topic,
topicPartition) -> offsetFromLeader),
+ isFuture = false)).once
+ }
+ EasyMock.replay(mockLogMgr)
+
+ val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
+ val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId,
s"host$brokerId", brokerId))
+
+ val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes
+ aliveBrokerIds.foreach { brokerId =>
+
EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes
+ }
+ EasyMock.replay(metadataCache)
+
+ val timer = new MockTimer
+ val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
+ purgatoryName = "Produce", timer, reaperEnabled = false)
+ val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
+ purgatoryName = "Fetch", timer, reaperEnabled = false)
+ val mockDeleteRecordsPurgatory = new
DelayedOperationPurgatory[DelayedDeleteRecords](
+ purgatoryName = "DeleteRecords", timer, reaperEnabled = false)
+
+ // Mock network client to show leader offset of 5
+ val quota = QuotaFactory.instantiate(config, metrics, time, "")
+ val blockingSend = new ReplicaFetcherMockBlockingSend(Map(new
TopicPartition(topic, topicPartition) ->
+ new EpochEndOffset(leaderEpochFromLeader, offsetFromLeader)).asJava,
BrokerEndPoint(1, "host1" ,1), time)
+ val replicaManager = new ReplicaManager(config, metrics, time,
kafkaZkClient, mockScheduler, mockLogMgr,
+ new AtomicBoolean(false), quota, mockBrokerTopicStats,
+ metadataCache, mockLogDirFailureChannel, mockProducePurgatory,
mockFetchPurgatory,
+ mockDeleteRecordsPurgatory, Option(this.getClass.getName)) {
+
+ override protected def createReplicaFetcherManager(metrics: Metrics,
+ time: Time,
+ threadNamePrefix:
Option[String],
+ quotaManager:
ReplicationQuotaManager): ReplicaFetcherManager = {
+ new ReplicaFetcherManager(config, this, metrics, time,
threadNamePrefix, quotaManager) {
+
+ override def createFetcherThread(fetcherId: Int, sourceBroker:
BrokerEndPoint): AbstractFetcherThread = {
+ new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId",
fetcherId,
+ sourceBroker, config, replicaManager, metrics, time,
quota.follower, Some(blockingSend)) {
+
+ override def doWork() = {
+ // In case the thread starts before the partition is added by
AbstractFetcherManager,
+ // add it here (it's a no-op if already added)
+ addPartitions(Map(new TopicPartition(topic, topicPartition) ->
0L))
+ super.doWork()
+
+ // Shut the thread down after one iteration to avoid
double-counting truncations
+ initiateShutdown()
+ countDownLatch.countDown()
+ }
+ }
+ }
+ }
+ }
+ }
+
+ (replicaManager, mockLogMgr)
+ }
+
+ private def leaderAndIsrPartitionState(leaderEpoch: Int,
+ leaderBrokerId: Int,
+ aliveBrokerIds: Seq[Integer]) :
LeaderAndIsrRequest.PartitionState = {
+ new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderBrokerId,
leaderEpoch, aliveBrokerIds.asJava,
+ zkVersion, aliveBrokerIds.asJava, false)
+ }
+
private class CallbackResult[T] {
private var value: Option[T] = None
private var fun: Option[T => Unit] = None
@@ -532,7 +752,8 @@ class ReplicaManagerTest {
private def appendRecords(replicaManager: ReplicaManager,
partition: TopicPartition,
records: MemoryRecords,
- isFromClient: Boolean = true):
CallbackResult[PartitionResponse] = {
+ isFromClient: Boolean = true,
+ requiredAcks: Short = -1):
CallbackResult[PartitionResponse] = {
val result = new CallbackResult[PartitionResponse]()
def appendCallback(responses: Map[TopicPartition, PartitionResponse]):
Unit = {
val response = responses.get(partition)
@@ -542,7 +763,7 @@ class ReplicaManagerTest {
replicaManager.appendRecords(
timeout = 1000,
- requiredAcks = -1,
+ requiredAcks = requiredAcks,
internalTopicsAllowed = false,
isFromClient = isFromClient,
entriesPerPartition = Map(partition -> records),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Follower should truncate after every leader epoch change
> --------------------------------------------------------
>
> Key: KAFKA-7164
> URL: https://issues.apache.org/jira/browse/KAFKA-7164
> Project: Kafka
> Issue Type: Bug
> Reporter: Jason Gustafson
> Assignee: Bob Barrett
> Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently we skip log truncation for followers if a LeaderAndIsr request is
> received, but the leader does not change. This can lead to log divergence if
> the follower missed a leader change before the current known leader was
> reelected. Basically the problem is that the leader may truncate its own log
> prior to becoming leader again, so the follower would need to reconcile its
> log again.
> For example, suppose that we have three replicas: r1, r2, and r3. Initially,
> r1 is the leader in epoch 0 and writes one record at offset 0. r3 replicates
> this successfully.
> {code}
> r1:
> status: leader
> epoch: 0
> log: [{id: 0, offset: 0, epoch:0}]
> r2:
> status: follower
> epoch: 0
> log: []
> r3:
> status: follower
> epoch: 0
> log: [{id: 0, offset: 0, epoch:0}]
> {code}
> Suppose then that r2 becomes leader in epoch 1. r1 notices the leader change
> and truncates, but r3 for whatever reason, does not.
> {code}
> r1:
> status: follower
> epoch: 1
> log: []
> r2:
> status: leader
> epoch: 1
> log: []
> r3:
> status: follower
> epoch: 0
> log: [{offset: 0, epoch:0}]
> {code}
> Now suppose that r2 fails and r1 becomes the leader in epoch 2. Immediately
> it writes a new record:
> {code}
> r1:
> status: leader
> epoch: 2
> log: [{id: 1, offset: 0, epoch:2}]
> r2:
> status: follower
> epoch: 2
> log: []
> r3:
> status: follower
> epoch: 0
> log: [{id: 0, offset: 0, epoch:0}]
> {code}
> If the replica continues fetching with the old epoch, we can have log
> divergence as noted in KAFKA-6880. However, if r3 successfully receives the
> new LeaderAndIsr request which updates the epoch to 2, but skips the
> truncation, then the logs will stay inconsistent.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)