AndrewJSchofield commented on code in PR #19328: URL: https://github.com/apache/kafka/pull/19328#discussion_r2028378471
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1365,6 +1368,67 @@ public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGrou ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData() .setGroupId(requestData.groupId()) .setTopics(readStateSummaryData); + + return readShareGroupStateSummary(readSummaryRequestData, requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList); + } + + /** + * See {@link GroupCoordinator#describeShareGroupAllOffsets(RequestContext, DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}. + */ + @Override + public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup> describeShareGroupAllOffsets( + RequestContext context, + DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture( + DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), Errors.COORDINATOR_NOT_AVAILABLE)); + } + + if (metadataImage == null) { + return CompletableFuture.completedFuture( + DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), Errors.COORDINATOR_NOT_AVAILABLE)); + } + + return runtime.scheduleReadOperation( + "share-group-initialized-partitions", + topicPartitionFor(requestData.groupId()), + (coordinator, offset) -> coordinator.initializedShareGroupPartitions(requestData.groupId()) + ).thenCompose(topicPartitionMap -> { + Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>(); + List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic> describeShareGroupOffsetsResponseTopicList = new ArrayList<>(topicPartitionMap.size()); + ReadShareGroupStateSummaryRequestData readSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId(requestData.groupId()); + topicPartitionMap.forEach((topicId, partitionSet) -> { + String topicName = metadataImage.topics().topicIdToNameView().get(topicId); + if (topicName != null) { + requestTopicIdToNameMapping.put(topicId, topicName); + readSummaryRequestData.topics().add(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(topicId) + .setPartitions( + partitionSet.stream().map( + partitionIndex -> new ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex) + ).toList() + )); + } + }); + return readShareGroupStateSummary(readSummaryRequestData, requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList); + }); Review Comment: There is already exception handling inside `readShareGroupStateSummary` and there is unit testing of it (`testDescribeShareGroupAllOffsetsThrowsError` for example). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org