dajac commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1396140016
########## clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequest.java: ########## @@ -83,4 +85,16 @@ public static ConsumerGroupDescribeRequest parse(ByteBuffer buffer, short versio version ); } + + public static List<ConsumerGroupDescribeResponseData.DescribedGroup> getErrorDescribedGroupList( Review Comment: Could we add a unit test for this new method? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6465,12 +6465,42 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + requestChannelRequest.context, + consumerGroupDescribeRequestData.groupIds +// any[RequestContext], +// any[util.List[String]] + )).thenReturn(future) + + createKafkaApis( + overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> "true") + ).handle(requestChannelRequest, RequestLocal.NoCaching) + Review Comment: I suppose that you need to complete the future here before calling `verifyNoThrottling`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -445,6 +446,42 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + + public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe( + List<String> groupIds, + long committedOffset + ) { + List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new ArrayList<>(); + + for (String groupId: groupIds) { + Group group = groups.get(groupId, committedOffset); Review Comment: I wonder if we could follow a bit more what we did in `describeGroups`. Would it be possible? For instance, there we use `genericGroup` which handles the group type check as well. We could use a similar method for consumer groups, I think. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -525,6 +527,69 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return future; } + /** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ + @Override + public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe( + RequestContext context, + List<String> groupIds + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList( + groupIds, + Errors.COORDINATOR_NOT_AVAILABLE + )); + } + + final List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> futures = + new ArrayList<>(groupIds.size()); + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DescribeGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(Collections.singletonList( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setErrorMessage(Errors.INVALID_GROUP_ID.message()) Review Comment: nit: Let's remove this one. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -545,6 +547,32 @@ public String currentAssignmentSummary() { ')'; } + public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember(Assignment targetAssignment) { + return new ConsumerGroupDescribeResponseData.Member() + .setMemberEpoch(memberEpoch) + .setMemberId(Uuid.fromString(memberId)) Review Comment: It may be better to change the member id field to a string as we use a string everywhere. I suppose that I got it wrong in the KIP. ########## clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequest.java: ########## @@ -83,4 +85,16 @@ public static ConsumerGroupDescribeRequest parse(ByteBuffer buffer, short versio version ); } + + public static List<ConsumerGroupDescribeResponseData.DescribedGroup> getErrorDescribedGroupList( + List<String> groupIds, + Errors error + ) { + return groupIds.stream() + .map(groupId -> new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(error.code()) + .setErrorMessage(error.message()) Review Comment: It would be better to remove the message here. We usually don't put it if it is the default one. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6465,12 +6465,42 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + requestChannelRequest.context, + consumerGroupDescribeRequestData.groupIds +// any[RequestContext], +// any[util.List[String]] + )).thenReturn(future) + + createKafkaApis( + overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> "true") + ).handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) + + val describedGroups = List(new DescribedGroup()).asJava Review Comment: Could we put add more groups with their info (e.g. id, etc). ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -445,6 +446,42 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + + public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe( + List<String> groupIds, + long committedOffset + ) { + List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new ArrayList<>(); + + for (String groupId: groupIds) { + Group group = groups.get(groupId, committedOffset); + + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId); + + if (group == null || !CONSUMER.equals(group.type())) { + // We don't support upgrading/downgrading between protocols at the moment so + // we set an error if a group exists with the wrong type. + describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message()); Review Comment: I think that we should return `GroupIdNotFoundException` and we can also remove the error message here. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6206,12 +6206,40 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + + createKafkaApis( + overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> "true") + ).handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) + + val describedGroups = List(new DescribedGroup()).asJava + val consumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() + .setGroups(describedGroups) + future.complete(describedGroups) + + assertEquals(consumerGroupDescribeResponseData, response.data) + } + Review Comment: +1 ########## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ########## @@ -601,4 +601,13 @@ private[group] class GroupCoordinatorAdapter( override def shutdown(): Unit = { coordinator.shutdown() } + + override def consumerGroupDescribe( Review Comment: Should we add a unit test for this new method? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6465,12 +6465,42 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) Review Comment: It would be great to use more groups (2 or 3). ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -445,6 +446,42 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + + public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe( + List<String> groupIds, + long committedOffset + ) { + List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new ArrayList<>(); + + for (String groupId: groupIds) { + Group group = groups.get(groupId, committedOffset); + + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId); + + if (group == null || !CONSUMER.equals(group.type())) { + // We don't support upgrading/downgrading between protocols at the moment so + // we set an error if a group exists with the wrong type. + describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message()); + describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code()); + } else { + ConsumerGroup consumerGroup = (ConsumerGroup) group; + describedGroup.setGroupState(consumerGroup.stateAsString()) + .setGroupEpoch(consumerGroup.groupEpoch()) + .setAssignmentEpoch(consumerGroup.assignmentEpoch()) + .setAssignorName(consumerGroup.preferredServerAssignor().isPresent() ? + consumerGroup.preferredServerAssignor().get() : ""); + consumerGroup.members().forEach( + (id, member) -> describedGroup.members().add(member.asConsumerGroupDescribeMember(consumerGroup.targetAssignment(member.memberId()))) Review Comment: All the reads here must take into account the `committedOffset`. It may be better to add a `asDescribedGroup` method to the consumer group to prepare it. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8660,6 +8665,68 @@ public void testListGroups() { assertEquals(expectAllGroupMap, actualAllGroupMap); } + @Test + public void testConsumerGroupDescribeNoErrors() { + String consumerGroupId = "consumerGroupId"; + int epoch = 10; + String memberId = Uuid.randomUuid().toString(); + String topicName = "topicName"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, epoch)) + .build(); + + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(Collections.singletonList(topicName)); + context.replay(RecordHelpers.newMemberSubscriptionRecord( + consumerGroupId, + memberBuilder.build() + )); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, epoch + 1)); + + List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId)); + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup(); + describedGroup.setGroupEpoch(epoch + 1); + describedGroup.setGroupId(consumerGroupId); + describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember())); + describedGroup.setAssignorName(null); + describedGroup.setGroupState("assigning"); + List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList( + describedGroup + ); + + assertEquals(expected, actual); + } Review Comment: +1. It would be great if we could do this. You can take a look at how we did it in other tests. If we are blocked, ping me on slack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org