dajac commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r658571696
##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
}
+ @Test
+ def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+ val args = Array("--bootstrap-server", "localhost:9092", "--group", group,
"--describe", "--offsets")
+ val groupService = consumerGroupService(args)
+
+ val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+ val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+ val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+ val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+ val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+ val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+ // Some topic's partitions gets valid OffsetAndMetada values, other gets
nulls values (negative integers) and others aren't defined
+ val commitedOffsets = Map(
+ testTopicPartition1 -> new OffsetAndMetadata(100),
+ testTopicPartition2 -> null,
+ testTopicPartition4 -> new OffsetAndMetadata(100),
+ testTopicPartition5 -> null,
+ ).asJava
Review comment:
Actually, it seems that we should always have `null` or an
`OffsetAndMetadata` here for each partition as the API always provided an
answer for the requested partitions.
`commitedOffsets` -> `committedOffsets`
##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
}
+ @Test
+ def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+ val args = Array("--bootstrap-server", "localhost:9092", "--group", group,
"--describe", "--offsets")
+ val groupService = consumerGroupService(args)
+
+ val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+ val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+ val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+ val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+ val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+ val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+ // Some topic's partitions gets valid OffsetAndMetada values, other gets
nulls values (negative integers) and others aren't defined
+ val commitedOffsets = Map(
+ testTopicPartition1 -> new OffsetAndMetadata(100),
+ testTopicPartition2 -> null,
+ testTopicPartition4 -> new OffsetAndMetadata(100),
+ testTopicPartition5 -> null,
+ ).asJava
+
+ val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100,
System.currentTimeMillis, Optional.of(1))
+ val endOffsets = Map(
+ testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+ )
+ val assignedTopicPartitions = Set(testTopicPartition0,
testTopicPartition1, testTopicPartition2)
+ val unassignedTopicPartitions = Set(testTopicPartition3,
testTopicPartition4, testTopicPartition5)
+
+ val consumerGroupDescription = new ConsumerGroupDescription(group,
+ true,
+ Collections.singleton(new MemberDescription("member1",
Optional.of("instance1"), "client1", "host1", new
MemberAssignment(assignedTopicPartitions.asJava))),
+ classOf[RangeAssignor].getName,
+ ConsumerGroupState.STABLE,
+ new Node(1, "localhost", 9092))
+
+ def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+ val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{
case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp ->
OffsetSpec.latest).toMap
+ val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) =>
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp ->
OffsetSpec.latest).toMap
+ ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+ (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet ||
map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) &&
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+ }
+ }
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
+ .thenReturn(new
DescribeConsumerGroupsResult(Collections.singletonMap(group,
KafkaFuture.completedFuture(consumerGroupDescription))))
+ when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+ when(admin.listOffsets(offsetsArgMatcher, any()))
+ .thenReturn(new ListOffsetsResult(endOffsets.asJava))
Review comment:
I was not fully satisfied with providing all the `endOffsets` in all the
cases here so I took a bit of time to play with Mockito. I was able to make it
work as follow:
```
def offsetsArgMatcher(expectedPartitions: Set[TopicPartition]):
ArgumentMatcher[util.Map[TopicPartition, OffsetSpec]] = {
topicPartitionOffsets => topicPartitionOffsets != null &&
topicPartitionOffsets.keySet.asScala.equals(expectedPartitions)
}
when(admin.listOffsets(
ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
any()
)).thenReturn(new
ListOffsetsResult(endOffsets.view.filterKeys(assignedTopicPartitions.contains).toMap.asJava))
when(admin.listOffsets(
ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
any()
)).thenReturn(new
ListOffsetsResult(endOffsets.view.filterKeys(unassignedTopicPartitions.contains).toMap.asJava))
```
Note the `topicPartitionOffsets != null` condition. It seems that Mockito
call the argument matcher once with `null` somehow.
Interestingly, the test fails when I use this modified version. I think that
this is due to the fact that not all the end offsets are returned all the time
now and that `committedOffsets` must contain all the partitions. See my
previous comment about this.
What do you think?
##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,84 @@ class ConsumerGroupServiceTest {
verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
}
+ @Test
+ def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+ val args = Array("--bootstrap-server", "localhost:9092", "--group", group,
"--describe", "--offsets")
+ val groupService = consumerGroupService(args)
+
+ val testTopicPartition0 = new TopicPartition("testTopic1", 0);
+ val testTopicPartition1 = new TopicPartition("testTopic1", 1);
+ val testTopicPartition2 = new TopicPartition("testTopic1", 2);
+ val testTopicPartition3 = new TopicPartition("testTopic2", 0);
+ val testTopicPartition4 = new TopicPartition("testTopic2", 1);
+ val testTopicPartition5 = new TopicPartition("testTopic2", 2);
+
+ // Some topic's partitions gets valid OffsetAndMetada values, other gets
nulls values (negative integers) and others aren't defined
+ val commitedOffsets = Map(
+ testTopicPartition1 -> new OffsetAndMetadata(100),
+ testTopicPartition2 -> null,
+ testTopicPartition4 -> new OffsetAndMetadata(100),
+ testTopicPartition5 -> null,
+ ).asJava
+
+ val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100,
System.currentTimeMillis, Optional.of(1))
+ val endOffsets = Map(
+ testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo),
+ testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo),
+ )
+ val assignedTopicPartitions = Set(testTopicPartition0,
testTopicPartition1, testTopicPartition2)
+ val unassignedTopicPartitions = Set(testTopicPartition3,
testTopicPartition4, testTopicPartition5)
+
+ val consumerGroupDescription = new ConsumerGroupDescription(group,
+ true,
+ Collections.singleton(new MemberDescription("member1",
Optional.of("instance1"), "client1", "host1", new
MemberAssignment(assignedTopicPartitions.asJava))),
+ classOf[RangeAssignor].getName,
+ ConsumerGroupState.STABLE,
+ new Node(1, "localhost", 9092))
+
+ def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
+ val expectedOffsetsUnassignedTopics = commitedOffsets.asScala.filter{
case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp ->
OffsetSpec.latest).toMap
+ val expectedOffsetsAssignedTopics = endOffsets.filter{ case (tp, _) =>
assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp ->
OffsetSpec.latest).toMap
+ ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map =>
+ (map.keySet.asScala == expectedOffsetsUnassignedTopics.keySet ||
map.keySet.asScala == expectedOffsetsAssignedTopics.keySet) &&
map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec])
+ }
+ }
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
+ .thenReturn(new
DescribeConsumerGroupsResult(Collections.singletonMap(group,
KafkaFuture.completedFuture(consumerGroupDescription))))
+ when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+ when(admin.listOffsets(offsetsArgMatcher, any()))
+ .thenReturn(new ListOffsetsResult(endOffsets.asJava))
+
+ val (state, assignments) = groupService.collectGroupOffsets(group)
+ val returnedOffsets = assignments.map { results =>
+ results.map { assignment =>
+ new TopicPartition(assignment.topic.get, assignment.partition.get) ->
assignment.offset
+ }.toMap
+ }.getOrElse(Map.empty)
+ // Results should have information for all assigned topic partition (even
if there is not Offset's information at all, because they get fills with None)
+ // Results should have information only for unassigned topic partitions if
and only if there is information about them (including with null values)
+ val expectedOffsets = Map(
+ testTopicPartition0 -> None,
+ testTopicPartition1 -> Some(100),
+ testTopicPartition2 -> None,
+ testTopicPartition4 -> Some(100),
+ testTopicPartition5 -> None
+ )
+ assertEquals(Some("Stable"), state)
+ assertTrue(assignments.nonEmpty)
Review comment:
This assertion is not needed anymore. It is implicitly checked by the
next one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]