clolov commented on code in PR #14238: URL: https://github.com/apache/kafka/pull/14238#discussion_r1299948032
########## core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala: ########## @@ -539,4 +545,52 @@ class DynamicConfigChangeUnitTest { //Then assertEquals(Seq(), result) } + + @Test + def testEnableRemoteLogStorageOnTopic(): Unit = { + val topic = "test-topic" + val tp = new TopicPartition(topic, 0) + val partition: Partition = mock(classOf[Partition]) + when(partition.isLeader).thenReturn(true) + val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) + val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) + val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) + doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(), followerPartitionsArg.capture(), any()) + val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) + when(replicaManager.onlinePartition(tp)).thenReturn(Some(partition)) + val log: UnifiedLog = mock(classOf[UnifiedLog]) + when(log.remoteLogEnabled()).thenReturn(true) + when(log.topicPartition).thenReturn(tp) + + val isRemoteLogEnabledBeforeUpdate = false + val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, null) + configHandler.maybeBootstrapRemoteLogComponents(Seq(log), isRemoteLogEnabledBeforeUpdate) + assertTrue(followerPartitionsArg.getValue.isEmpty) + assertEquals(Collections.singleton(partition), leaderPartitionsArg.getValue) + } + + @Test + def testDisableRemoteLogStorageOnTopic(): Unit = { + val topic = "test-topic" + val tp = new TopicPartition(topic, 0) + val partition: Partition = mock(classOf[Partition]) + when(partition.isLeader).thenReturn(true) + when(partition.isLeader).thenReturn(false) + val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) + doNothing().when(rlm).stopPartitions(ArgumentMatchers.eq(Set(tp).asJava), ArgumentMatchers.eq(true), any()) + doNothing().when(rlm).stopPartitions(ArgumentMatchers.eq(Set(tp).asJava), ArgumentMatchers.eq(false), any()) Review Comment: Similar question to the above ########## core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala: ########## @@ -539,4 +545,52 @@ class DynamicConfigChangeUnitTest { //Then assertEquals(Seq(), result) } + + @Test + def testEnableRemoteLogStorageOnTopic(): Unit = { + val topic = "test-topic" + val tp = new TopicPartition(topic, 0) + val partition: Partition = mock(classOf[Partition]) + when(partition.isLeader).thenReturn(true) + val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) + val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) + val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) + doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(), followerPartitionsArg.capture(), any()) + val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) + when(replicaManager.onlinePartition(tp)).thenReturn(Some(partition)) + val log: UnifiedLog = mock(classOf[UnifiedLog]) + when(log.remoteLogEnabled()).thenReturn(true) + when(log.topicPartition).thenReturn(tp) + + val isRemoteLogEnabledBeforeUpdate = false + val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, null) + configHandler.maybeBootstrapRemoteLogComponents(Seq(log), isRemoteLogEnabledBeforeUpdate) + assertTrue(followerPartitionsArg.getValue.isEmpty) + assertEquals(Collections.singleton(partition), leaderPartitionsArg.getValue) + } + + @Test + def testDisableRemoteLogStorageOnTopic(): Unit = { + val topic = "test-topic" + val tp = new TopicPartition(topic, 0) + val partition: Partition = mock(classOf[Partition]) + when(partition.isLeader).thenReturn(true) + when(partition.isLeader).thenReturn(false) Review Comment: If you want the first invocation to return true and the second to return false I think the correct syntax is: ``` when(partition.isLeader).thenReturn(true).thenReturn(false) ``` ########## core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala: ########## @@ -539,4 +545,52 @@ class DynamicConfigChangeUnitTest { //Then assertEquals(Seq(), result) } + + @Test + def testEnableRemoteLogStorageOnTopic(): Unit = { + val topic = "test-topic" + val tp = new TopicPartition(topic, 0) + val partition: Partition = mock(classOf[Partition]) + when(partition.isLeader).thenReturn(true) + val rlm: RemoteLogManager = mock(classOf[RemoteLogManager]) + val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) + val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = ArgumentCaptor.forClass(classOf[util.Set[Partition]]) + doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(), followerPartitionsArg.capture(), any()) + val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) + when(replicaManager.remoteLogManager).thenReturn(Some(rlm)) + when(replicaManager.onlinePartition(tp)).thenReturn(Some(partition)) + val log: UnifiedLog = mock(classOf[UnifiedLog]) + when(log.remoteLogEnabled()).thenReturn(true) + when(log.topicPartition).thenReturn(tp) + + val isRemoteLogEnabledBeforeUpdate = false + val configHandler: TopicConfigHandler = new TopicConfigHandler(replicaManager, null, null, null) + configHandler.maybeBootstrapRemoteLogComponents(Seq(log), isRemoteLogEnabledBeforeUpdate) + assertTrue(followerPartitionsArg.getValue.isEmpty) + assertEquals(Collections.singleton(partition), leaderPartitionsArg.getValue) + } + + @Test + def testDisableRemoteLogStorageOnTopic(): Unit = { + val topic = "test-topic" + val tp = new TopicPartition(topic, 0) + val partition: Partition = mock(classOf[Partition]) + when(partition.isLeader).thenReturn(true) + when(partition.isLeader).thenReturn(false) Review Comment: I was under the impression that this line will completely override the previous one, can you whether the previous line does something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org