AndrewJSchofield commented on code in PR #19328:
URL: https://github.com/apache/kafka/pull/19328#discussion_r2028378471


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1365,6 +1368,67 @@ public 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGrou
         ReadShareGroupStateSummaryRequestData readSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
             .setGroupId(requestData.groupId())
             .setTopics(readStateSummaryData);
+
+        return readShareGroupStateSummary(readSummaryRequestData, 
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+    }
+
+    /**
+     * See {@link 
GroupCoordinator#describeShareGroupAllOffsets(RequestContext, 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
+     */
+    @Override
+    public 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 describeShareGroupAllOffsets(
+        RequestContext context,
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData
+    ) {
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+        }
+
+        if (metadataImage == null) {
+            return CompletableFuture.completedFuture(
+                
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+        }
+
+        return runtime.scheduleReadOperation(
+            "share-group-initialized-partitions",
+            topicPartitionFor(requestData.groupId()),
+            (coordinator, offset) -> 
coordinator.initializedShareGroupPartitions(requestData.groupId())
+        ).thenCompose(topicPartitionMap -> {
+            Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
+            
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList = new 
ArrayList<>(topicPartitionMap.size());
+            ReadShareGroupStateSummaryRequestData readSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+                .setGroupId(requestData.groupId());
+            topicPartitionMap.forEach((topicId, partitionSet) -> {
+                String topicName = 
metadataImage.topics().topicIdToNameView().get(topicId);
+                if (topicName != null) {
+                    requestTopicIdToNameMapping.put(topicId, topicName);
+                    readSummaryRequestData.topics().add(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                        .setTopicId(topicId)
+                        .setPartitions(
+                            partitionSet.stream().map(
+                                partitionIndex -> new 
ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
+                            ).toList()
+                        ));
+                }
+            });
+            return readShareGroupStateSummary(readSummaryRequestData, 
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+        });

Review Comment:
   There is already exception handling inside `readShareGroupStateSummary` and 
there is unit testing of it (`testDescribeShareGroupAllOffsetsThrowsError` for 
example).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to