dajac commented on a change in pull request #10858: URL: https://github.com/apache/kafka/pull/10858#discussion_r657158355
########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition + ).asJava Review comment: I would remove all the comments and only put one before `offsets` which explains that certain partitions are not present and certain have `null` or something along these lines. ########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition + ).asJava + + val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) + val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), + ) + val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) + val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet + + def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = { Review comment: `describeGroupsResult` is used only once in the test. It seems that we could simply declare a variable which contains the result that we want to return. ########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition + ).asJava + + val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) + val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), + ) + val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) + val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet Review comment: So here, we have `testTopicPartition4` and `testTopicPartition5` because `testTopicPartition3` is not in `offsets`. Is it expected to not have `testTopicPartition3`? Personally, I would rather prefer to list the partition explicitly here. It makes the test easier to read in my opinion. ########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition + ).asJava + + val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) + val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), + ) + val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) + val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet + + def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = { + val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava)) + val description = new ConsumerGroupDescription(group, + true, + Collections.singleton(member1), + classOf[RangeAssignor].getName, + groupState, + new Node(1, "localhost", 9092)) + new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description))) + } + + def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => + map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } + } Review comment: I suppose that we are using an argument matcher because `OffsetSpec` is not comparable. Am I getting this right? If this is the case, as `offsetsArgMatcherUnassignedTopics` and `offsetsArgMatcherAssignedTopics` are nearly identical, I wonder if we could define an argument matcher which received the expected `Map` as an argument. Would this be possible? ########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition + ).asJava + + val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) + val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), + ) + val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) + val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet + + def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = { + val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava)) + val description = new ConsumerGroupDescription(group, + true, + Collections.singleton(member1), + classOf[RangeAssignor].getName, + groupState, + new Node(1, "localhost", 9092)) + new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description))) + } + + def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => + map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } + } + def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => + map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } + } + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE)) + when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)) + doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any()) + doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any()) Review comment: Any reasons why we don't use `when(...).thenReturn(...)` here as well? It seems that it should work as well. ########## File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala ########## @@ -62,6 +62,92 @@ class ConsumerGroupServiceTest { verify(admin, times(1)).listOffsets(offsetsArgMatcher, any()) } + @Test + def testAdminRequestsForDescribeNegativeOffsets(): Unit = { + val args = Array("--bootstrap-server", "localhost:9092", "--group", group, "--describe", "--offsets") + val groupService = consumerGroupService(args) + + val testTopicPartition0 = new TopicPartition("testTopic1", 0); + val testTopicPartition1 = new TopicPartition("testTopic1", 1); + val testTopicPartition2 = new TopicPartition("testTopic1", 2); + val testTopicPartition3 = new TopicPartition("testTopic2", 0); + val testTopicPartition4 = new TopicPartition("testTopic2", 1); + val testTopicPartition5 = new TopicPartition("testTopic2", 2); + + val offsets = Map( + //testTopicPartition0 -> there is no offset information for an asssigned topic partition + testTopicPartition1 -> new OffsetAndMetadata(100), // regular information for a assigned partition + testTopicPartition2 -> null, //there is a null value for an asssigned topic partition + // testTopicPartition3 -> there is no offset information for an unasssigned topic partition + testTopicPartition4 -> new OffsetAndMetadata(100), // regular information for a unassigned partition + testTopicPartition5 -> null, //there is a null value for an unasssigned topic partition + ).asJava + + val resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis, Optional.of(1)) + val endOffsets = Map( + testTopicPartition0 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition1 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition2 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition3 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition4 -> KafkaFuture.completedFuture(resultInfo), + testTopicPartition5 -> KafkaFuture.completedFuture(resultInfo), + ) + val assignedTopicPartitions = Set(testTopicPartition0, testTopicPartition1, testTopicPartition2 ) + val unassignedTopicPartitions = offsets.asScala.filterNot { case (tp, _) => assignedTopicPartitions.contains(tp) }.toMap.keySet + + def describeGroupsResult(groupState: ConsumerGroupState): DescribeConsumerGroupsResult = { + val member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions.asJava)) + val description = new ConsumerGroupDescription(group, + true, + Collections.singleton(member1), + classOf[RangeAssignor].getName, + groupState, + new Node(1, "localhost", 9092)) + new DescribeConsumerGroupsResult(Collections.singletonMap(group, KafkaFuture.completedFuture(description))) + } + + def offsetsArgMatcherAssignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = endOffsets.filter{ case (tp, _) => assignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => + map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } + } + def offsetsArgMatcherUnassignedTopics: util.Map[TopicPartition, OffsetSpec] = { + val expectedOffsets = offsets.asScala.filter{ case (tp, _) => unassignedTopicPartitions.contains(tp) }.keySet.map(tp => tp -> OffsetSpec.latest).toMap + ArgumentMatchers.argThat[util.Map[TopicPartition, OffsetSpec]] { map => + map.keySet.asScala == expectedOffsets.keySet && map.values.asScala.forall(_.isInstanceOf[OffsetSpec.LatestSpec]) + } + } + when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)), any())) + .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE)) + when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())) + .thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)) + doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherAssignedTopics, any()) + doAnswer(_ => new ListOffsetsResult(endOffsets.asJava)).when(admin).listOffsets(offsetsArgMatcherUnassignedTopics, any()) + + val (state, assignments) = groupService.collectGroupOffsets(group) + assertEquals(Some("Stable"), state) + assertTrue(assignments.nonEmpty) + // Results should have information for all assigned topic partition (even if there is not Offset's information at all, because they get fills with None) + // Results should have information only for unassigned topic partitions if and only if there is information about them (included with null values) + assertEquals(assignedTopicPartitions.size + unassignedTopicPartitions.size , assignments.get.size) + assignments.map( results => + results.map( partitionAssignmentState => + (partitionAssignmentState.topic, partitionAssignmentState.partition) match { + case (Some("testTopic1"), Some(0)) => assertEquals(None, partitionAssignmentState.offset) + case (Some("testTopic1"), Some(1)) => assertEquals(Some(100), partitionAssignmentState.offset) + case (Some("testTopic1"), Some(2)) => assertEquals(None, partitionAssignmentState.offset) + case (Some("testTopic2"), Some(1)) => assertEquals(Some(100), partitionAssignmentState.offset) + case (Some("testTopic2"), Some(2)) => assertEquals(None, partitionAssignmentState.offset) + case _ => assertTrue(false) + })) Review comment: I am not a fan of doing assertions like this. I would rather prefer to use `assertEquals` if possible because the error message is more meaningful in case of failure. We could do something like this: ``` returnedOffsets = assignmentsOpt.map { assignments => assignments.map { assignment => new TopicPartition(....) -> partitionAssignmentState.offset }.toMap }.getOrElse(Map.empty) expectedOffsets = Map( ... ) assertEquals(expectedOffsets, returnedOffsets) ``` This would replace all the assertions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org