dongnuo123 commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1375454303
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3686,8 +3686,51 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { - requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest] + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val response = new ConsumerGroupDescribeResponseData() + + val authorizedGroups = new ArrayBuffer[String]() + consumerGroupDescribeRequest.data.groupIds.forEach { groupId => + if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { + response.groups.add(new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code) + .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message) + ) + } else { + authorizedGroups += groupId + } + } + + val future = groupCoordinator.consumerGroupDescribe( + request.context, + authorizedGroups.asJava + ) + + future.handle[Unit] { (results, exception) => Review Comment: nit: Have we used `future` anywhere else? Could we write it like ``` groupCoordinator.consumerGroupDescribe( request.context, authorizedGroups.asJava ).handle[Unit] {...} ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -445,6 +446,44 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + + public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe( + List<String> groupIds, + long committedOffset + ) { + List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new ArrayList<>(); + + for (String groupId: groupIds) { + Group group = groups.get(groupId, committedOffset); + + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId); + + if (group == null || !CONSUMER.equals(group.type())) { + // We don't support upgrading/downgrading between protocols at the moment so + // we set an error if a group exists with the wrong type. + describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message()); + describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code()); + } else { + ConsumerGroup consumerGroup = (ConsumerGroup) group; + describedGroup.setGroupState(consumerGroup.stateAsString()) + .setGroupEpoch(consumerGroup.groupEpoch()) + .setAssignmentEpoch(consumerGroup.assignmentEpoch()) + .setAssignorName( + consumerGroup.preferredServerAssignor().isPresent() ? + consumerGroup.preferredServerAssignor().get() : null + ); + consumerGroup.members().forEach( + (id, member) -> describedGroup.members().add(member.asConsumerGroupDescribeMember()) + ); Review Comment: Attributes like group state, epoch, members are timeline object in ConsumerGroup. You can get them according to the committedOffset. An example is `public String stateAsString(long committedOffset)` in ConsumerGroup. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8660,6 +8665,68 @@ public void testListGroups() { assertEquals(expectAllGroupMap, actualAllGroupMap); } + @Test + public void testConsumerGroupDescribeNoErrors() { + String consumerGroupId = "consumerGroupId"; + int epoch = 10; + String memberId = Uuid.randomUuid().toString(); + String topicName = "topicName"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, epoch)) + .build(); + + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(Collections.singletonList(topicName)); + context.replay(RecordHelpers.newMemberSubscriptionRecord( + consumerGroupId, + memberBuilder.build() + )); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, epoch + 1)); + + List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId)); + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup(); + describedGroup.setGroupEpoch(epoch + 1); + describedGroup.setGroupId(consumerGroupId); + describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember())); + describedGroup.setAssignorName(null); + describedGroup.setGroupState("assigning"); + List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList( + describedGroup + ); + + assertEquals(expected, actual); + } Review Comment: Could we test more on describe the groups by committedOffset? For instance, we've added a group but not updated the last committed offset, then we shouldn't be able to describe this group. After updating the last committed offset, we're able to describe the group. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -444,6 +445,43 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + + public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe( + List<String> groupIds + ) { + List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new ArrayList<>(); + + for (String groupId: groupIds) { + Group group = groups.get(groupId); + + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId); + + if (group == null || !CONSUMER.equals(group.type())) { + // We don't support upgrading/downgrading between protocols at the moment so + // we set an error if a group exists with the wrong type. + describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message()); + describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code()); + } else { + ConsumerGroup consumerGroup = (ConsumerGroup) group; + describedGroup.setGroupState(consumerGroup.stateAsString()) + .setGroupEpoch(consumerGroup.groupEpoch()) + .setAssignmentEpoch(consumerGroup.assignmentEpoch()) + .setAssignorName( + consumerGroup.preferredServerAssignor().isPresent() ? + consumerGroup.preferredServerAssignor().get() : null Review Comment: I think yes though we need to confirm with @dajac ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6206,12 +6206,40 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + + createKafkaApis( + overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> "true") + ).handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) + + val describedGroups = List(new DescribedGroup()).asJava + val consumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() + .setGroups(describedGroups) + future.complete(describedGroups) + + assertEquals(consumerGroupDescribeResponseData, response.data) + } + Review Comment: Could we add some extra tests to test request with unauthorized groups and future that completes with exception? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -1046,6 +1047,112 @@ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception { assertEquals(expectedResponse, future.get()); } + @Test + public void testConsumerGroupDescribe() throws InterruptedException, ExecutionException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + int partitionCount = 2; + service.startup(() -> partitionCount); + + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-id-1"); + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-id-2"); + List<ConsumerGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList( + describedGroup1, + describedGroup2 + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("consumer-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); + + CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>(); + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("consumer-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), + ArgumentMatchers.any() + )).thenReturn(describedGroupFuture); + + CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future = + service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Arrays.asList("group-id-1", "group-id-2")); + + assertFalse(future.isDone()); + describedGroupFuture.complete(Collections.singletonList(describedGroup2)); + assertEquals(expectedDescribedGroups, future.get()); + } + + @Test + public void testConsumerGroupDescribeInvalidGroupId() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + int partitionCount = 1; + service.startup(() -> partitionCount); + + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(""); + List<ConsumerGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setErrorMessage(Errors.INVALID_GROUP_ID.message()), + describedGroup + ); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("consumer-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup))); + + CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future = + service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Arrays.asList("", null)); + + assertEquals(expectedDescribedGroups, future.get()); + } + + @Test + public void testConsumerGroupDescribeCoordinatorLoadInProgress() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + int partitionCount = 1; + service.startup(() -> partitionCount); + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("consumer-group-describe"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture( + new CoordinatorLoadInProgressException(null) + )); + + CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future = + service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Collections.singletonList("group-id")); + + assertEquals( + Collections.singletonList(new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-id") + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setErrorMessage(Errors.COORDINATOR_LOAD_IN_PROGRESS.message()) + ), + future.get() + ); + } + Review Comment: Could we add another test when the coordinator is not active? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -497,6 +499,66 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return future; } + /** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ + @Override + public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe( + RequestContext context, + List<String> groupIds + ) { + if (!isActive.get()) { + return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); + } Review Comment: We should return a completed future with a response whose error code set to Errors.COORDINATOR_NOT_AVAILABLE.code(). We'll fix this in the other apis in https://github.com/apache/kafka/pull/14589 ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6206,12 +6206,40 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + + createKafkaApis( + overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> "true") + ).handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) + + val describedGroups = List(new DescribedGroup()).asJava + val consumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() + .setGroups(describedGroups) + future.complete(describedGroups) Review Comment: I guess the mock not invoked error is because the future is not completed when sending the response (calling `verifyNoThrottling`). Some mock sending the response is not invoked and cause the error. Something like the following should help. ``` val describedGroups = List(new DescribedGroup()).asJava val consumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() .setGroups(describedGroups) future.complete(describedGroups) val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) assert... ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -545,6 +547,26 @@ public String currentAssignmentSummary() { ')'; } + public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember() { + return new ConsumerGroupDescribeResponseData.Member() + .setMemberEpoch(memberEpoch) + .setMemberId(Uuid.fromString(memberId)) + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions( + assignedPartitions.entrySet().stream().map( + item -> new ConsumerGroupDescribeResponseData.TopicPartitions() + .setTopicId(item.getKey()) + .setPartitions(new ArrayList<>(item.getValue())) Review Comment: I guess we don't need to set the topic name here. Need to confirm with @dajac ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6206,12 +6206,40 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) Review Comment: You can change it back to the way it was. This is actually invoked. See comment on line 6226. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -545,6 +547,26 @@ public String currentAssignmentSummary() { ')'; } + public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember() { + return new ConsumerGroupDescribeResponseData.Member() + .setMemberEpoch(memberEpoch) + .setMemberId(Uuid.fromString(memberId)) + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions( + assignedPartitions.entrySet().stream().map( + item -> new ConsumerGroupDescribeResponseData.TopicPartitions() + .setTopicId(item.getKey()) + .setPartitions(new ArrayList<>(item.getValue())) + ).collect(Collectors.toList()) + )) + .setClientHost(clientHost) + .setClientId(clientId) + .setInstanceId(instanceId) + .setRackId(rackId) + .setSubscribedTopicNames(subscribedTopicNames) + .setSubscribedTopicRegex(subscribedTopicRegex); Review Comment: We have target assignments stored in ConsumerGroup. Can we pass it in as a parameter or use anyway that helps it access the `targetAssignment` in ConsumerGroup? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org