[ 
https://issues.apache.org/jira/browse/KAFKA-6859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16598061#comment-16598061
 ] 

ASF GitHub Bot commented on KAFKA-6859:
---------------------------------------

hachikuji closed pull request #5320: KAFKA-6859: Do not send LeaderEpochRequest 
for undefined leader epochs
URL: https://github.com/apache/kafka/pull/5320
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index 0b7167087d1..0c00e5588f2 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -358,7 +358,7 @@ class LogSegment private[log] (val log: FileRecords,
 
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
           leaderEpochCache.foreach { cache =>
-            if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is 
to avoid unnecessary warning in cache.assign()
+            if (batch.partitionLeaderEpoch > cache.latestEpoch) // this is to 
avoid unnecessary warning in cache.assign()
               cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
           }
           updateProducerState(producerStateManager, batch)
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala 
b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 08c4a17f2d7..05dc3566c03 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -147,7 +147,7 @@ class ReplicaAlterLogDirsThread(name: String,
 
     val (partitionsWithEpoch, partitionsWithoutEpoch) = 
partitionEpochOpts.partition { case (_, epochCacheOpt) => 
epochCacheOpt.nonEmpty }
 
-    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch() }
+    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch }
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 56335a62b60..3b1a54f3bb1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -95,7 +95,8 @@ class ReplicaFetcherThread(name: String,
   private val minBytes = brokerConfig.replicaFetchMinBytes
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
-  private val shouldSendLeaderEpochRequest: Boolean = 
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+  private val brokerSupportsLeaderEpochRequest: Boolean = 
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+
   private val fetchSessionHandler = new FetchSessionHandler(logContext, 
sourceBroker.id)
 
   private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  
replicaMgr.getReplica(tp).map(_.epochs.get)
@@ -346,19 +347,29 @@ class ReplicaFetcherThread(name: String,
     val (partitionsWithEpoch, partitionsWithoutEpoch) = 
partitionEpochOpts.partition { case (_, epochCacheOpt) => 
epochCacheOpt.nonEmpty }
 
     debug(s"Build leaderEpoch request $partitionsWithEpoch")
-    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch() }
+    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> 
epochCacheOpt.get.latestEpoch }
     ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
   }
 
   override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): 
Map[TopicPartition, EpochEndOffset] = {
     var result: Map[TopicPartition, EpochEndOffset] = null
-    if (shouldSendLeaderEpochRequest) {
-      val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> 
epoch.asInstanceOf[Integer] }.toMap.asJava
+    if (brokerSupportsLeaderEpochRequest) {
+      // skip request for partitions without epoch, as their topic log message 
format doesn't support epochs
+      val (partitionsWithEpoch, partitionsWithoutEpoch) = partitions.partition 
{ case (_, epoch) => epoch != UNDEFINED_EPOCH }
+      val resultWithoutEpoch = partitionsWithoutEpoch.map { case (tp, _) => 
(tp, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)) }
+
+      if (partitionsWithEpoch.isEmpty) {
+        debug("Skipping leaderEpoch request since all partitions do not have 
an epoch")
+        return resultWithoutEpoch
+      }
+
+      val partitionsAsJava = partitionsWithEpoch.map { case (tp, epoch) => tp 
-> epoch.asInstanceOf[Integer] }.toMap.asJava
       val epochRequest = new 
OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, 
partitionsAsJava)
       try {
         val response = leaderEndpoint.sendRequest(epochRequest)
-        result = 
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
-        debug(s"Receive leaderEpoch response $result")
+
+        result = 
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
 ++ resultWithoutEpoch
+        debug(s"Receive leaderEpoch response $result; Skipped request for 
partitions ${partitionsWithoutEpoch.keys}")
       } catch {
         case t: Throwable =>
           warn(s"Error when sending leader epoch request for $partitions", t)
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala 
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index 23a53056f32..88f5d6bd8e3 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable.ListBuffer
 
 trait LeaderEpochCache {
   def assign(leaderEpoch: Int, offset: Long)
-  def latestEpoch(): Int
+  def latestEpoch: Int
   def endOffsetFor(epoch: Int): (Int, Long)
   def clearAndFlushLatest(offset: Long)
   def clearAndFlushEarliest(offset: Long)
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index fbf77404b02..9c759cebc3f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -23,7 +23,9 @@ import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.LeaderEpochCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.protocol.Errors._
@@ -31,7 +33,7 @@ import org.apache.kafka.common.requests.EpochEndOffset
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.utils.SystemTime
 import org.easymock.EasyMock._
-import org.easymock.{Capture, CaptureType}
+import org.easymock.{Capture, CaptureType, IAnswer}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -44,6 +46,7 @@ class ReplicaFetcherThreadTest {
   private val t1p1 = new TopicPartition("topic1", 1)
   private val t2p1 = new TopicPartition("topic2", 1)
 
+  private var toFail = false
   private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
 
   @Test
@@ -71,6 +74,14 @@ class ReplicaFetcherThreadTest {
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
     props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
     val config = KafkaConfig.fromProps(props)
+    val leaderEndpoint = createMock(classOf[BlockingSend])
+    expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new 
IAnswer[ClientResponse] {
+      override def answer(): ClientResponse = {
+        toFail = true // assert no leader request is sent
+        createMock(classOf[ClientResponse])
+      }
+    })
+    replay(leaderEndpoint)
     val thread = new ReplicaFetcherThread(
       name = "bob",
       fetcherId = 0,
@@ -89,9 +100,148 @@ class ReplicaFetcherThreadTest {
       t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
     )
 
+    assertFalse("Leader request should not have been sent", toFail)
+    assertEquals("results from leader epoch request should have undefined 
offset", expected, result)
+  }
+
+  /**
+    * If a partition doesn't have an epoch in the cache, this means it either
+    *   does not support epochs (log message format below 0.11) or it is a 
bootstrapping follower.
+    *
+    * In both cases, the partition has an undefined epoch
+    *   and there is no use to send the request, as we know the broker will 
answer with that epoch
+    */
+  @Test
+  def shouldNotSendEpochRequestIfLastEpochUndefinedForAllPartitions(): Unit = {
+    val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0")
+    val config = KafkaConfig.fromProps(props)
+    val leaderEndpoint = createMock(classOf[BlockingSend])
+
+    expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new 
IAnswer[ClientResponse] {
+      override def answer(): ClientResponse = {
+        toFail = true // assert no leader request is sent
+        createMock(classOf[ClientResponse])
+      }
+    })
+    replay(leaderEndpoint)
+    val thread = new ReplicaFetcherThread(
+      name = "bob",
+      fetcherId = 0,
+      sourceBroker = brokerEndPoint,
+      brokerConfig = config,
+      replicaMgr = null,
+      metrics =  new Metrics(),
+      time = new SystemTime(),
+      quota = null,
+      leaderEndpointBlockingSend = Some(leaderEndpoint))
+
+
+    val result = thread.fetchEpochsFromLeader(Map(t1p0 -> UNDEFINED_EPOCH, 
t1p1 -> UNDEFINED_EPOCH))
+
+    val expected = Map(
+      t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET),
+      t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, 
UNDEFINED_EPOCH_OFFSET)
+    )
+
+    assertFalse("Leader request should not have been sent", toFail)
     assertEquals("results from leader epoch request should have undefined 
offset", expected, result)
   }
 
+  @Test
+  def shouldFetchLeaderEpochRequestIfLastEpochDefinedForSomePartitions(): Unit 
= {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+
+    //Setup all dependencies
+    val quota = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
+    val logManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica = createNiceMock(classOf[Replica])
+    val partition = createMock(classOf[Partition])
+    val replicaManager = createMock(classOf[ReplicaManager])
+
+    val leaderEpoch = 5
+
+    //Stubs
+    expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
+    expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
+    expect(replica.highWatermark).andReturn(new 
LogOffsetMetadata(0)).anyTimes()
+    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
+    expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
+    expect(leaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).once()  // 
t2p1 doesnt support epochs
+    expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 
0)).anyTimes()
+    expect(replicaManager.logManager).andReturn(logManager).anyTimes()
+    
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
+    stub(replica, partition, replicaManager)
+
+    //Expectations
+    expect(partition.truncateTo(anyLong(), anyBoolean())).once
+
+    replay(leaderEpochs, replicaManager, logManager, quota, replica)
+
+    //Define the offsets for the OffsetsForLeaderEpochResponse
+    val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1),
+      t1p1 -> new EpochEndOffset(leaderEpoch, 1),
+      t2p1 -> new EpochEndOffset(-1, 1)).asJava
+
+    //Create the fetcher thread
+    val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, 
brokerEndPoint, new SystemTime())
+
+    val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, 
replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
+
+    // topic 1 supports epoch, t2 doesn't
+    thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0, t2p1 -> 0))
+
+    assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = 
false, shouldBeTruncatingLog = true, shouldBeDelayed = false)
+    //Loop 1
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(1, mockNetwork.fetchCount)
+
+    assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = 
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+    //Loop 2 we should not fetch epochs
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(2, mockNetwork.fetchCount)
+
+    assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = 
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+    //Loop 3 we should not fetch epochs
+    thread.doWork()
+    assertEquals(1, mockNetwork.epochFetchCount)
+    assertEquals(3, mockNetwork.fetchCount)
+
+    assertPartitionStates(thread.partitionStates, shouldBeReadyForFetch = 
true, shouldBeTruncatingLog = false, shouldBeDelayed = false)
+
+    //Assert that truncate to is called exactly once (despite two loops)
+    verify(logManager)
+  }
+
+  /**
+    * Assert that all partitions' states are as expected
+    *
+    */
+  def assertPartitionStates(states: PartitionStates[PartitionFetchState], 
shouldBeReadyForFetch: Boolean,
+                            shouldBeTruncatingLog: Boolean, shouldBeDelayed: 
Boolean): Unit = {
+    for (tp <- List(t1p0, t1p1, t2p1)) {
+      assertEquals(
+        s"Partition $tp should${if (!shouldBeReadyForFetch) " NOT" else ""} be 
ready for fetching",
+        shouldBeReadyForFetch, states.stateValue(tp).isReadyForFetch)
+
+      assertEquals(
+        s"Partition $tp should${if (!shouldBeTruncatingLog) " NOT" else ""} be 
truncating its log",
+        shouldBeTruncatingLog,
+        states.stateValue(tp).isTruncatingLog)
+
+      assertEquals(
+        s"Partition $tp should${if (!shouldBeDelayed) " NOT" else ""} be 
delayed",
+        shouldBeDelayed,
+        states.stateValue(tp).isDelayed)
+    }
+  }
+
   @Test
   def shouldHandleExceptionFromBlockingSend(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 171bcf3528c..41564a54b05 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -625,7 +625,7 @@ class ReplicaManagerTest {
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new 
LogDirFailureChannel(config.logDirs.size)
     val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache])
-    
EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader)
+    
EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
     EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
       .andReturn((leaderEpochFromLeader, localLogOffset))
     EasyMock.replay(mockLeaderEpochCache)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Follower should not send OffsetForLeaderEpoch for undefined leader epochs
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-6859
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6859
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>            Reporter: Anna Povzner
>            Assignee: Stanislav Kozlovski
>            Priority: Major
>             Fix For: 2.1.0
>
>
> This is more of an optimization, rather than correctness.
> Currently, if the follower on inter broker protocol version 0.11 and higher, 
> but on older message format, it does not track leader epochs. However, will 
> still send OffsetForLeaderEpoch request to the leader with undefined epoch 
> which is guaranteed to return undefined offset, so that the follower 
> truncated to high watermark. Another example is a bootstrapping follower that 
> does not have any leader epochs recorded, 
> It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to 
> the follower with undefined leader epochs, since we already know the answer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to