[ https://issues.apache.org/jira/browse/KAFKA-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598061#comment-16598061 ]
ASF GitHub Bot commented on KAFKA-6859: --------------------------------------- hachikuji closed pull request #5320: KAFKA-6859: Do not send LeaderEpochRequest for undefined leader epochs URL: https://github.com/apache/kafka/pull/5320 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0b7167087d1..0c00e5588f2 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -358,7 +358,7 @@ class LogSegment private[log] (val log: FileRecords, if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { leaderEpochCache.foreach { cache => - if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign() + if (batch.partitionLeaderEpoch > cache.latestEpoch) // this is to avoid unnecessary warning in cache.assign() cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } updateProducerState(producerStateManager, batch) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 08c4a17f2d7..05dc3566c03 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -147,7 +147,7 @@ class ReplicaAlterLogDirsThread(name: String, val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty } - val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() } + val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch } ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 56335a62b60..3b1a54f3bb1 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -95,7 +95,8 @@ class ReplicaFetcherThread(name: String, private val minBytes = brokerConfig.replicaFetchMinBytes private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes private val fetchSize = brokerConfig.replicaFetchMaxBytes - private val shouldSendLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2 + private val brokerSupportsLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2 + private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id) private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp).map(_.epochs.get) @@ -346,19 +347,29 @@ class ReplicaFetcherThread(name: String, val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty } debug(s"Build leaderEpoch request $partitionsWithEpoch") - val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() } + val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch } ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet) } override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { var result: Map[TopicPartition, EpochEndOffset] = null - if (shouldSendLeaderEpochRequest) { - val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava + if (brokerSupportsLeaderEpochRequest) { + // skip request for partitions without epoch, as their topic log message format doesn't support epochs + val (partitionsWithEpoch, partitionsWithoutEpoch) = partitions.partition { case (_, epoch) => epoch != UNDEFINED_EPOCH } + val resultWithoutEpoch = partitionsWithoutEpoch.map { case (tp, _) => (tp, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)) } + + if (partitionsWithEpoch.isEmpty) { + debug("Skipping leaderEpoch request since all partitions do not have an epoch") + return resultWithoutEpoch + } + + val partitionsAsJava = partitionsWithEpoch.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava val epochRequest = new OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, partitionsAsJava) try { val response = leaderEndpoint.sendRequest(epochRequest) - result = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala - debug(s"Receive leaderEpoch response $result") + + result = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala ++ resultWithoutEpoch + debug(s"Receive leaderEpoch response $result; Skipped request for partitions ${partitionsWithoutEpoch.keys}") } catch { case t: Throwable => warn(s"Error when sending leader epoch request for $partitions", t) diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 23a53056f32..88f5d6bd8e3 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -28,7 +28,7 @@ import scala.collection.mutable.ListBuffer trait LeaderEpochCache { def assign(leaderEpoch: Int, offset: Long) - def latestEpoch(): Int + def latestEpoch: Int def endOffsetFor(epoch: Int): (Int, Long) def clearAndFlushLatest(offset: Long) def clearAndFlushEarliest(offset: Long) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index fbf77404b02..9c759cebc3f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -23,7 +23,9 @@ import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.epoch.LeaderEpochCache import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.TestUtils +import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.Errors._ @@ -31,7 +33,7 @@ import org.apache.kafka.common.requests.EpochEndOffset import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.utils.SystemTime import org.easymock.EasyMock._ -import org.easymock.{Capture, CaptureType} +import org.easymock.{Capture, CaptureType, IAnswer} import org.junit.Assert._ import org.junit.Test @@ -44,6 +46,7 @@ class ReplicaFetcherThreadTest { private val t1p1 = new TopicPartition("topic1", 1) private val t2p1 = new TopicPartition("topic2", 1) + private var toFail = false private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000) @Test @@ -71,6 +74,14 @@ class ReplicaFetcherThreadTest { props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2") props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2") val config = KafkaConfig.fromProps(props) + val leaderEndpoint = createMock(classOf[BlockingSend]) + expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] { + override def answer(): ClientResponse = { + toFail = true // assert no leader request is sent + createMock(classOf[ClientResponse]) + } + }) + replay(leaderEndpoint) val thread = new ReplicaFetcherThread( name = "bob", fetcherId = 0, @@ -89,9 +100,148 @@ class ReplicaFetcherThreadTest { t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) ) + assertFalse("Leader request should not have been sent", toFail) + assertEquals("results from leader epoch request should have undefined offset", expected, result) + } + + /** + * If a partition doesn't have an epoch in the cache, this means it either + * does not support epochs (log message format below 0.11) or it is a bootstrapping follower. + * + * In both cases, the partition has an undefined epoch + * and there is no use to send the request, as we know the broker will answer with that epoch + */ + @Test + def shouldNotSendEpochRequestIfLastEpochUndefinedForAllPartitions(): Unit = { + val props = TestUtils.createBrokerConfig(1, "localhost:1234") + props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0") + val config = KafkaConfig.fromProps(props) + val leaderEndpoint = createMock(classOf[BlockingSend]) + + expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] { + override def answer(): ClientResponse = { + toFail = true // assert no leader request is sent + createMock(classOf[ClientResponse]) + } + }) + replay(leaderEndpoint) + val thread = new ReplicaFetcherThread( + name = "bob", + fetcherId = 0, + sourceBroker = brokerEndPoint, + brokerConfig = config, + replicaMgr = null, + metrics = new Metrics(), + time = new SystemTime(), + quota = null, + leaderEndpointBlockingSend = Some(leaderEndpoint)) + + + val result = thread.fetchEpochsFromLeader(Map(t1p0 -> UNDEFINED_EPOCH, t1p1 -> UNDEFINED_EPOCH)) + + val expected = Map( + t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), + t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) + ) + + assertFalse("Leader request should not have been sent", toFail) assertEquals("results from leader epoch request should have undefined offset", expected, result) } + @Test + def shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions(): Unit = { + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) + + //Setup all dependencies + val quota = createNiceMock(classOf[ReplicationQuotaManager]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val logManager = createMock(classOf[LogManager]) + val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) + val replica = createNiceMock(classOf[Replica]) + val partition = createMock(classOf[Partition]) + val replicaManager = createMock(classOf[ReplicaManager]) + + val leaderEpoch = 5 + + //Stubs + expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() + expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() + expect(replica.highWatermark).andReturn(new LogOffsetMetadata(0)).anyTimes() + expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once() + expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once() + expect(leaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).once() // t2p1 doesnt support epochs + expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 0)).anyTimes() + expect(replicaManager.logManager).andReturn(logManager).anyTimes() + expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() + stub(replica, partition, replicaManager) + + //Expectations + expect(partition.truncateTo(anyLong(), anyBoolean())).once + + replay(leaderEpochs, replicaManager, logManager, quota, replica) + + //Define the offsets for the OffsetsForLeaderEpochResponse + val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), + t1p1 -> new EpochEndOffset(leaderEpoch, 1), + t2p1 -> new EpochEndOffset(-1, 1)).asJava + + //Create the fetcher thread + val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime()) + + val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork)) + + // topic 1 supports epoch, t2 doesn't + thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0, t2p1 -> 0)) + + assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = false, shouldBeTruncatingLog = true, shouldBeDelayed = false) + //Loop 1 + thread.doWork() + assertEquals(1, mockNetwork.epochFetchCount) + assertEquals(1, mockNetwork.fetchCount) + + assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = true, shouldBeTruncatingLog = false, shouldBeDelayed = false) + + //Loop 2 we should not fetch epochs + thread.doWork() + assertEquals(1, mockNetwork.epochFetchCount) + assertEquals(2, mockNetwork.fetchCount) + + assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = true, shouldBeTruncatingLog = false, shouldBeDelayed = false) + + //Loop 3 we should not fetch epochs + thread.doWork() + assertEquals(1, mockNetwork.epochFetchCount) + assertEquals(3, mockNetwork.fetchCount) + + assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = true, shouldBeTruncatingLog = false, shouldBeDelayed = false) + + //Assert that truncate to is called exactly once (despite two loops) + verify(logManager) + } + + /** + * Assert that all partitions' states are as expected + * + */ + def assertPartitionStates(states: PartitionStates[PartitionFetchState], shouldBeReadyForFetch: Boolean, + shouldBeTruncatingLog: Boolean, shouldBeDelayed: Boolean): Unit = { + for (tp <- List(t1p0, t1p1, t2p1)) { + assertEquals( + s"Partition $tp should${if (!shouldBeReadyForFetch) " NOT" else ""} be ready for fetching", + shouldBeReadyForFetch, states.stateValue(tp).isReadyForFetch) + + assertEquals( + s"Partition $tp should${if (!shouldBeTruncatingLog) " NOT" else ""} be truncating its log", + shouldBeTruncatingLog, + states.stateValue(tp).isTruncatingLog) + + assertEquals( + s"Partition $tp should${if (!shouldBeDelayed) " NOT" else ""} be delayed", + shouldBeDelayed, + states.stateValue(tp).isDelayed) + } + } + @Test def shouldHandleExceptionFromBlockingSend(): Unit = { val props = TestUtils.createBrokerConfig(1, "localhost:1234") diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 171bcf3528c..41564a54b05 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -625,7 +625,7 @@ class ReplicaManagerTest { val mockBrokerTopicStats = new BrokerTopicStats val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache]) - EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader) + EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader) EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader)) .andReturn((leaderEpochFromLeader, localLogOffset)) EasyMock.replay(mockLeaderEpochCache) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Follower should not send OffsetForLeaderEpoch for undefined leader epochs > ------------------------------------------------------------------------- > > Key: KAFKA-6859 > URL: https://issues.apache.org/jira/browse/KAFKA-6859 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.11.0.0 > Reporter: Anna Povzner > Assignee: Stanislav Kozlovski > Priority: Major > Fix For: 2.1.0 > > > This is more of an optimization, rather than correctness. > Currently, if the follower on inter broker protocol version 0.11 and higher, > but on older message format, it does not track leader epochs. However, will > still send OffsetForLeaderEpoch request to the leader with undefined epoch > which is guaranteed to return undefined offset, so that the follower > truncated to high watermark. Another example is a bootstrapping follower that > does not have any leader epochs recorded, > It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to > the follower with undefined leader epochs, since we already know the answer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)