dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r657158355



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+      // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+    ).asJava

Review comment:
       I would remove all the comments and only put one before `offsets` which 
explains that certain partitions are not present and certain have `null` or 
something along these lines.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+      // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {

Review comment:
       `describeGroupsResult` is used only once in the test. It seems that we 
could simply declare a variable which contains the result that we want to 
return.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+      // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet

Review comment:
       So here, we have `testTopicPartition4` and `testTopicPartition5` because 
`testTopicPartition3` is not in `offsets`. Is it expected to not have 
`testTopicPartition3`? Personally, I would rather prefer to list the partition 
explicitly here. It makes the test easier to read in my opinion.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+      // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] 
= {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }

Review comment:
       I suppose that we are using an argument matcher because `OffsetSpec` is 
not comparable. Am I getting this right?
   
   If this is the case, as `offsetsArgMatcherUnassignedTopics` and 
`offsetsArgMatcherAssignedTopics` are nearly identical, I wonder if we could 
define an argument matcher which received the expected `Map` as an argument. 
Would this be possible?

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+      // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] 
= {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, 
OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics,
 any())
+    doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics,
 any())

Review comment:
       Any reasons why we don't use `when(...).thenReturn(...)` here as well? 
It seems that it should work as well.

##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,92 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+    val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+    val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+    val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+    val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+    val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+    val offsets = Map(
+      //testTopicPartition0 -> there is no offset information for an asssigned 
topic partition
+      testTopicPartition1 -> new OffsetAndMetadata(100), // regular 
information for a assigned partition
+      testTopicPartition2 -> null, //there is a null value for an asssigned 
topic partition
+      // testTopicPartition3 ->  there is no offset information for an 
unasssigned topic partition
+      testTopicPartition4 -> new OffsetAndMetadata(100), // regular 
information for a unassigned partition
+      testTopicPartition5 -> null, //there is a null value for an unasssigned 
topic partition
+    ).asJava
+
+    val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, 
System.currentTimeMillis, Optional.of(1))
+    val endOffsets = Map(
+      testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+      testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+    )
+    val assignedTopicPartitions = Set(testTopicPartition0, 
testTopicPartition1, testTopicPartition2 )
+    val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) 
=> assignedTopicPartitions.contains(tp) }.toMap.keySet
+
+    def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
+      val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava))
+      val description = new ConsumerGroupDescription(group,
+        true,
+        Collections.singleton(member1),
+        classOf[RangeAssignor].getName,
+        groupState,
+        new Node(1, "localhost", 9092))
+      new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
+    }
+
+    def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] 
= {
+      val expectedOffsets = endOffsets.filter{ case (tp, _) => 
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, 
OffsetSpec] = {
+      val expectedOffsets = offsets.asScala.filter{ case (tp, _) => 
unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> 
OffsetSpec.latest).toMap
+      ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+        map.keySet.asScala == expectedOffsets.keySet && 
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+      }
+    }
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets))
+    doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics,
 any())
+    doAnswer(_ => new 
ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics,
 any())
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    // Results should have information for all assigned topic partition (even 
if there is not Offset's information at all, because they get fills with None)
+    // Results should have information only for unassigned topic partitions if 
and only if there is information about them (included with null values)
+    assertEquals(assignedTopicPartitions.size + unassignedTopicPartitions.size 
, assignments.get.size)
+    assignments.map( results =>
+      results.map( partitionAssignmentState =>
+        (partitionAssignmentState.topic, partitionAssignmentState.partition) 
match {
+          case (Some("testTopic1"), Some(0)) => assertEquals(None, 
partitionAssignmentState.offset)
+          case (Some("testTopic1"), Some(1)) => assertEquals(Some(100), 
partitionAssignmentState.offset)
+          case (Some("testTopic1"), Some(2)) => assertEquals(None, 
partitionAssignmentState.offset)
+          case (Some("testTopic2"), Some(1)) => assertEquals(Some(100), 
partitionAssignmentState.offset)
+          case (Some("testTopic2"), Some(2)) => assertEquals(None, 
partitionAssignmentState.offset)
+          case _ => assertTrue(false)
+    }))

Review comment:
       I am not a fan of doing assertions like this. I would rather prefer to 
use `assertEquals` if possible because the error message is more meaningful in 
case of failure. We could do something like this:
   
   ```
   returnedOffsets = assignmentsOpt.map { assignments =>
     assignments.map { assignment => 
       new TopicPartition(....) -> partitionAssignmentState.offset
     }.toMap
   }.getOrElse(Map.empty)
   
   expectedOffsets = Map(
      ...
   )
   
   assertEquals(expectedOffsets, returnedOffsets)
   ```
   
   This would replace all the assertions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to