Parkerhiphop commented on code in PR #20082:
URL: https://github.com/apache/kafka/pull/20082#discussion_r2659287628
##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -5360,6 +5360,71 @@ class ReplicaManagerTest {
assertEquals(expectedTopicId, fetchState.get.topicId)
}
+ @Test
+ def testReplicaAlterLogDirsMultipleReassignmentDoesNotBlockLogCleaner():
Unit = {
+ val localId = 0
+ val tp = new TopicPartition(topic, 0)
+ val tpId = new TopicIdPartition(topicId, tp)
+
+ val props = TestUtils.createBrokerConfig(localId)
+ val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
+ val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
+ val path3 = TestUtils.tempRelativeDir("data3").getAbsolutePath
+ props.put("log.dirs", Seq(path1, path2, path3).mkString(","))
+ val config = KafkaConfig.fromProps(props)
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.asScala.map(new
File(_)), cleanerConfig = new CleanerConfig(true))
+ mockLogMgr.startup(Set())
+ val replicaManager = new ReplicaManager(
+ metrics = metrics,
+ config = config,
+ time = time,
+ scheduler = new MockScheduler(time),
+ logManager = mockLogMgr,
+ quotaManagers = quotaManager,
+ metadataCache = metadataCache,
+ logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+ alterPartitionManager = alterPartitionManager,
+ addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
+
+ try {
+ val spiedPartition = spy(Partition(tpId, time, replicaManager))
+ replicaManager.addOnlinePartition(tp, spiedPartition)
+
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
partitions = List(0, 1), List.empty, topic, topicIds(topic))
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
+
+ // Move the replica to the second log directory.
+ val partition = replicaManager.getPartitionOrException(tp)
+ val firstLogDir = partition.log.get.dir.getParentFile
+ val newReplicaFolder = replicaManager.logManager.liveLogDirs.filterNot(_
== firstLogDir).head
+ replicaManager.alterReplicaLogDirs(Map(tp ->
newReplicaFolder.getAbsolutePath))
+
+ // Prevent promotion of future replica
+
doReturn(false).when(spiedPartition).maybeReplaceCurrentWithFutureReplica()
Review Comment:
Hi, I've opened #21244 to fix this flaky test.
Thanks for the detailed explanation here, it was very helpful!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]