junrao commented on code in PR #13075: URL: https://github.com/apache/kafka/pull/13075#discussion_r1063808402
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -205,6 +205,63 @@ class ReplicaManagerTest { when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) } + @Test + def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = { + val dir1 = TestUtils.tempDir() + val dir2 = TestUtils.tempDir() + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val metadataCache: MetadataCache = mock(classOf[MetadataCache]) + mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = logManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + val partition = rm.createPartition(new TopicPartition(topic, 0)) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), + Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ()) + appendRecords(rm, new TopicPartition(topic, 0), + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()), new SimpleRecord("second message".getBytes()))) + logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), dir2.getAbsolutePath) + + partition.createLogIfNotExists(isNew = true, isFutureReplica = true, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + // remove cache to disable OffsetsForLeaderEpoch API + partition.futureLog.get.leaderEpochCache = None + + // this method should use hw of future log to create log dir fetcher. Otherwise, it causes offset mismatch error + val result = rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None) + assertEquals(1, result.size) + assertEquals(0L, result(new TopicPartition(topic, 0)).initOffset) + assertNotEquals(0, rm.replicaAlterLogDirsManager.fetcherThreadMap.size) + // make sure alter folder thread has processed the data Review Comment: alter folder thread => alter log dir thread? ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -205,6 +205,61 @@ class ReplicaManagerTest { when(cache.getAliveBrokerNodes(any[ListenerName])).thenReturn(aliveBrokers) } + @Test + def testMaybeAddLogDirFetchersWithoutEpochCache(): Unit = { + val dir1 = TestUtils.tempDir() + val dir2 = TestUtils.tempDir() + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dirs", dir1.getAbsolutePath + "," + dir2.getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val logManager = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(new Properties())) + val metadataCache: MetadataCache = mock(classOf[MetadataCache]) + mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0))) + when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = logManager, + quotaManagers = quotaManager, + metadataCache = metadataCache, + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + val partition = rm.createPartition(new TopicPartition(topic, 0)) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + + rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(0) + .setLeaderEpoch(0) + .setIsr(Seq[Integer](0).asJava) + .setPartitionEpoch(0) + .setReplicas(Seq[Integer](0).asJava) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, Uuid.randomUuid()), + Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ()) + appendRecords(rm, new TopicPartition(topic, 0), + MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()), new SimpleRecord("second message".getBytes()))) + logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), dir2.getAbsolutePath) + + partition.createLogIfNotExists(isNew = true, isFutureReplica = true, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), None) + // remove cache to disable OffsetsForLeaderEpoch API + partition.futureLog.get.leaderEpochCache = None + + // this method should use hw of future log to create log dir fetcher. Otherwise, it causes offset mismatch error + rm.maybeAddLogDirFetchers(Set(partition), new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints), _ => None) + assertNotEquals(0, rm.replicaAlterLogDirsManager.fetcherThreadMap.size) + // make sure alter folder thread has processed the data + rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => t.doWork()) + assertEquals(Set.empty, rm.replicaAlterLogDirsManager.failedPartitions.partitions()) Review Comment: Hmm, we test that none of the partitions is in failed state, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org