chia7712 commented on code in PR #13075:
URL: https://github.com/apache/kafka/pull/13075#discussion_r1063758284


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -205,6 +205,63 @@ class ReplicaManagerTest {
     when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers)
   }
 
+  @Test
+  def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = {
+    val dir1 = TestUtils.tempDir()
+    val dir2 = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath)
+    val config = KafkaConfig.fromProps(props)
+    val logManager = TestUtils.createLogManager(config.logDirs.map(new 
File(_)), new LogConfig(new Properties()))
+    val metadataCache: MetadataCache = mock(classOf[MetadataCache])
+    mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
+    
when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
+    val rm = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = logManager,
+      quotaManagers = quotaManager,
+      metadataCache = metadataCache,
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+    val partition = rm.createPartition(new TopicPartition(topic, 0))
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+      new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+
+    rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+      Seq(new LeaderAndIsrPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(0)
+        .setControllerEpoch(0)
+        .setLeader(0)
+        .setLeaderEpoch(0)
+        .setIsr(Seq[Integer](0).asJava)
+        .setPartitionEpoch(0)
+        .setReplicas(Seq[Integer](0).asJava)
+        .setIsNew(false)).asJava,
+      Collections.singletonMap(topic, Uuid.randomUuid()),
+      Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
+    appendRecords(rm, new TopicPartition(topic, 0),
+      MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first 
message".getBytes()), new SimpleRecord("second message".getBytes())))
+    logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), 
dir2.getAbsolutePath)
+
+    partition.createLogIfNotExists(isNew = true, isFutureReplica = true,
+      new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None)
+    // remove cache to disable OffsetsForLeaderEpoch API
+    partition.futureLog.get.leaderEpochCache = None
+
+    // this method should use hw of future log to create log dir fetcher. 
Otherwise, it causes offset mismatch error
+    val result = rm.maybeAddLogDirFetchers(Set(partition), new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None)
+    assertEquals(1, result.size)
+    assertEquals(0L, result(new TopicPartition(topic, 0)).initOffset)

Review Comment:
   @junrao Please take a look at line#258



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to