clolov commented on code in PR #14238:
URL: https://github.com/apache/kafka/pull/14238#discussion_r1299948032


##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -539,4 +545,52 @@ class DynamicConfigChangeUnitTest {
     //Then
     assertEquals(Seq(), result)
   }
+
+  @Test
+  def testEnableRemoteLogStorageOnTopic(): Unit = {
+    val topic = "test-topic"
+    val tp = new TopicPartition(topic, 0)
+    val partition: Partition = mock(classOf[Partition])
+    when(partition.isLeader).thenReturn(true)
+    val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
+    val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+    val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+    doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(), 
followerPartitionsArg.capture(), any())
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+    when(replicaManager.onlinePartition(tp)).thenReturn(Some(partition))
+    val log: UnifiedLog = mock(classOf[UnifiedLog])
+    when(log.remoteLogEnabled()).thenReturn(true)
+    when(log.topicPartition).thenReturn(tp)
+
+    val isRemoteLogEnabledBeforeUpdate = false
+    val configHandler: TopicConfigHandler = new 
TopicConfigHandler(replicaManager, null, null, null)
+    configHandler.maybeBootstrapRemoteLogComponents(Seq(log), 
isRemoteLogEnabledBeforeUpdate)
+    assertTrue(followerPartitionsArg.getValue.isEmpty)
+    assertEquals(Collections.singleton(partition), 
leaderPartitionsArg.getValue)
+  }
+
+  @Test
+  def testDisableRemoteLogStorageOnTopic(): Unit = {
+    val topic = "test-topic"
+    val tp = new TopicPartition(topic, 0)
+    val partition: Partition = mock(classOf[Partition])
+    when(partition.isLeader).thenReturn(true)
+    when(partition.isLeader).thenReturn(false)
+    val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
+    doNothing().when(rlm).stopPartitions(ArgumentMatchers.eq(Set(tp).asJava), 
ArgumentMatchers.eq(true), any())
+    doNothing().when(rlm).stopPartitions(ArgumentMatchers.eq(Set(tp).asJava), 
ArgumentMatchers.eq(false), any())

Review Comment:
   Similar question to the above



##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -539,4 +545,52 @@ class DynamicConfigChangeUnitTest {
     //Then
     assertEquals(Seq(), result)
   }
+
+  @Test
+  def testEnableRemoteLogStorageOnTopic(): Unit = {
+    val topic = "test-topic"
+    val tp = new TopicPartition(topic, 0)
+    val partition: Partition = mock(classOf[Partition])
+    when(partition.isLeader).thenReturn(true)
+    val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
+    val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+    val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+    doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(), 
followerPartitionsArg.capture(), any())
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+    when(replicaManager.onlinePartition(tp)).thenReturn(Some(partition))
+    val log: UnifiedLog = mock(classOf[UnifiedLog])
+    when(log.remoteLogEnabled()).thenReturn(true)
+    when(log.topicPartition).thenReturn(tp)
+
+    val isRemoteLogEnabledBeforeUpdate = false
+    val configHandler: TopicConfigHandler = new 
TopicConfigHandler(replicaManager, null, null, null)
+    configHandler.maybeBootstrapRemoteLogComponents(Seq(log), 
isRemoteLogEnabledBeforeUpdate)
+    assertTrue(followerPartitionsArg.getValue.isEmpty)
+    assertEquals(Collections.singleton(partition), 
leaderPartitionsArg.getValue)
+  }
+
+  @Test
+  def testDisableRemoteLogStorageOnTopic(): Unit = {
+    val topic = "test-topic"
+    val tp = new TopicPartition(topic, 0)
+    val partition: Partition = mock(classOf[Partition])
+    when(partition.isLeader).thenReturn(true)
+    when(partition.isLeader).thenReturn(false)

Review Comment:
   If you want the first invocation to return true and the second to return 
false I think the correct syntax is:
   ```
   when(partition.isLeader).thenReturn(true).thenReturn(false)
   ```



##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -539,4 +545,52 @@ class DynamicConfigChangeUnitTest {
     //Then
     assertEquals(Seq(), result)
   }
+
+  @Test
+  def testEnableRemoteLogStorageOnTopic(): Unit = {
+    val topic = "test-topic"
+    val tp = new TopicPartition(topic, 0)
+    val partition: Partition = mock(classOf[Partition])
+    when(partition.isLeader).thenReturn(true)
+    val rlm: RemoteLogManager = mock(classOf[RemoteLogManager])
+    val leaderPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+    val followerPartitionsArg: ArgumentCaptor[util.Set[Partition]] = 
ArgumentCaptor.forClass(classOf[util.Set[Partition]])
+    doNothing().when(rlm).onLeadershipChange(leaderPartitionsArg.capture(), 
followerPartitionsArg.capture(), any())
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+    when(replicaManager.remoteLogManager).thenReturn(Some(rlm))
+    when(replicaManager.onlinePartition(tp)).thenReturn(Some(partition))
+    val log: UnifiedLog = mock(classOf[UnifiedLog])
+    when(log.remoteLogEnabled()).thenReturn(true)
+    when(log.topicPartition).thenReturn(tp)
+
+    val isRemoteLogEnabledBeforeUpdate = false
+    val configHandler: TopicConfigHandler = new 
TopicConfigHandler(replicaManager, null, null, null)
+    configHandler.maybeBootstrapRemoteLogComponents(Seq(log), 
isRemoteLogEnabledBeforeUpdate)
+    assertTrue(followerPartitionsArg.getValue.isEmpty)
+    assertEquals(Collections.singleton(partition), 
leaderPartitionsArg.getValue)
+  }
+
+  @Test
+  def testDisableRemoteLogStorageOnTopic(): Unit = {
+    val topic = "test-topic"
+    val tp = new TopicPartition(topic, 0)
+    val partition: Partition = mock(classOf[Partition])
+    when(partition.isLeader).thenReturn(true)
+    when(partition.isLeader).thenReturn(false)

Review Comment:
   I was under the impression that this line will completely override the 
previous one, can you whether the previous line does something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to