apoorvmittal10 commented on code in PR #18671:
URL: https://github.com/apache/kafka/pull/18671#discussion_r1935632909


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10441,6 +10443,125 @@ class KafkaApisTest extends Logging {
     })
   }
 
+  @Test
+  def testDescribeShareGroupOffsetsReturnsUnsupportedVersion(): Unit = {
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics(
+      util.List.of(new 
DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
+    )
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build())
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+    response.data.responses.forEach(topic => 
topic.partitions().forEach(partition => 
assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode())))
+  }
+
+  @Test
+  def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = {
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics(
+      util.List.of(new 
DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
+    )
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build())
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(Seq(AuthorizationResult.DENIED).asJava)

Review Comment:
   Can we avoid conversions?
   ```suggestion
         .thenReturn(util.List.of(AuthorizationResult.ALLOWED))
   ```



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java:
##########
@@ -212,17 +216,35 @@ public void testDescribeOffsetsOfAllExistingGroups() 
throws Exception {
                 ), 0)),
                 GroupState.STABLE,
                 new Node(0, "host1", 9090), 0, 0);
-            ListOffsetsResult resultOffsets = new ListOffsetsResult(
+            ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
                 Map.of(
-                    new TopicPartition("topic1", 0),
-                    KafkaFuture.completedFuture(new 
ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty()))
-                ));
+                    firstGroup,
+                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), 0L))
+                )
+            );
+            ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+                Map.of(
+                    secondGroup,
+                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), 0L))
+                )
+            );
 
             
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing,
 secondGroupListing)));
             
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
             
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, 
KafkaFuture.completedFuture(exp1), secondGroup, 
KafkaFuture.completedFuture(exp2)));
             
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-            
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets);
+            
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer(new
 Answer<Object>() {
+                @Override
+                public Object answer(InvocationOnMock invocation) throws 
Throwable {
+                    Map<String, Object> argument = invocation.getArgument(0);
+                    if (argument.containsKey(firstGroup)) {
+                        return listShareGroupOffsetsResult1;
+                    } else if (argument.containsKey(secondGroup)) {
+                        return listShareGroupOffsetsResult2;
+                    }
+                    return null;
+                }
+            });

Review Comment:
   ```suggestion
               
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer(
                   invocation -> {
                       Map<String, Object> argument = invocation.getArgument(0);
                       if (argument.containsKey(firstGroup)) {
                           return listShareGroupOffsetsResult1;
                       } else if (argument.containsKey(secondGroup)) {
                           return listShareGroupOffsetsResult2;
                       }
                       return null;
                   });
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -2409,4 +2191,323 @@ public void 
testShareGroupDescribeCoordinatorNotActive() throws ExecutionExcepti
             future.get()
         );
     }
+
+    @Test
+    public void testDescribeShareGroupOffsetsWithNoOpPersister() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DescribeShareGroupOffsetsResponseData responseData = new 
DescribeShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+                        .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
+                        
.setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE)))
+                )
+            );
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsWithDefaultPersister() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                        .setPartition(partition)))));
+
+        DescribeShareGroupOffsetsResponseData responseData = new 
DescribeShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setStartOffset(21)
+                        .setErrorCode(Errors.NONE.code())
+                        .setErrorMessage(Errors.NONE.message())))
+                )
+            );
+
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setStartOffset(21)
+                        .setStateEpoch(1)
+                        .setErrorCode(Errors.NONE.code())
+                        .setErrorMessage(Errors.NONE.message())))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+            
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsWithDefaultPersisterThrowsError() 
{
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        when(persister.readSummary(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.failedFuture(new Exception("Unable 
to validate read state summary request")));
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(future, Exception.class, "Unable to validate read 
state summary request");
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsWithDefaultPersisterNullResult() {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        when(persister.readSummary(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.completedFuture(null));
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(future, IllegalStateException.class, "Result is 
null for the read state summary");
+    }
+
+    @Test
+    public void 
testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
+            new 
ReadShareGroupStateSummaryResult.Builder().setTopicsData(null).build();
+
+        when(persister.readSummary(ArgumentMatchers.any()))
+            
.thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(future, IllegalStateException.class, "Result is 
null for the read state summary");
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build();
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DescribeShareGroupOffsetsResponseData responseData = new 
DescribeShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setStartOffset(0)
+                        .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                        
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message())))
+                )
+            );
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsMetadataImageNull() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build(true);
+
+        // Forcing a null Metadata Image
+        service.onNewMetadataImage(null, null);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DescribeShareGroupOffsetsResponseData responseData = new 
DescribeShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setStartOffset(0)
+                        .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                        
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
+                )
+            );
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @FunctionalInterface
+    interface TriFunction<A, B, C, R> {

Review Comment:
   ```suggestion
       private interface TriFunction<A, B, C, R> {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is the handler for {@link 
KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
+ */
+public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
+
+    private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
+    private final Logger log;
+    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+    public ListShareGroupOffsetsHandler(
+        Map<String, ListShareGroupOffsetsSpec> groupSpecs,
+        LogContext logContext) {
+        this.groupSpecs = groupSpecs;
+        this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
+        this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
+    }
+
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Long>> newFuture(Collection<String> groupIds) {
+        return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
+    }
+
+    @Override
+    public String apiName() {
+        return "describeShareGroupOffsets";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    @Override
+    public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int 
coordinatorId, Set<CoordinatorKey> keys) {
+        List<String> groupIds = keys.stream().map(key -> {
+            if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
+                throw new IllegalArgumentException("Invalid group coordinator 
key " + key +
+                    " when building `DescribeShareGroupOffsets` request");
+            }
+            return key.idValue;
+        }).collect(Collectors.toList());
+        // The DescribeShareGroupOffsetsRequest only includes a single group 
ID at this point, which is likely a mistake to be fixing a follow-on PR.
+        String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
+        if (groupId == null) {
+            throw new IllegalArgumentException("Missing group id in request");
+        }
+        ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+        
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic>
 topics =
+            spec.topicPartitions().stream().map(
+                topicPartition -> new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                    .setTopicName(topicPartition.topic())
+                    .setPartitions(List.of(topicPartition.partition()))
+            ).collect(Collectors.toList());
+        DescribeShareGroupOffsetsRequestData data = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId(groupId)
+            .setTopics(topics);
+        return new DescribeShareGroupOffsetsRequest.Builder(data, true);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> 
handleResponse(Node coordinator,
+                                                                               
Set<CoordinatorKey> groupIds,
+                                                                               
AbstractResponse abstractResponse) {
+        final DescribeShareGroupOffsetsResponse response = 
(DescribeShareGroupOffsetsResponse) abstractResponse;
+        final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+
+        for (CoordinatorKey groupId : groupIds) {
+            Map<TopicPartition, Long> data = new HashMap<>();
+            response.data().responses().stream().map(
+                describedTopic ->
+                    describedTopic.partitions().stream().map(
+                        partition ->
+                            data.put(new 
TopicPartition(describedTopic.topicName(), partition.partitionIndex()), 
partition.startOffset())
+                    ).collect(Collectors.toList())
+            ).collect(Collectors.toList());
+            completed.put(groupId, data);
+        }
+        return new ApiResult<>(completed, failed, new ArrayList<>());

Review Comment:
   ```suggestion
           return new ApiResult<>(completed, failed, Collections.emptyList());
   ```



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -8970,4 +8975,85 @@ public void testRemoveRaftVoterRequest(boolean fail, 
boolean sendClusterId) thro
             assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), 
requestData.get().voterDirectoryId());
         }
     }
+
+    @Test
+    public void testListShareGroupOffsetsOptionsWithBatchedApi() {
+        final Cluster cluster = mockCluster(3, 0);
+        final Time time = new MockTime();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
+            AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            final List<TopicPartition> partitions = 
Collections.singletonList(new TopicPartition("A", 0));
+            final ListShareGroupOffsetsOptions options = new 
ListShareGroupOffsetsOptions();
+
+            final ListShareGroupOffsetsSpec groupSpec = new 
ListShareGroupOffsetsSpec()
+                .topicPartitions(partitions);
+            Map<String, ListShareGroupOffsetsSpec> groupSpecs = new 
HashMap<>();
+            groupSpecs.put(GROUP_ID, groupSpec);
+
+            env.adminClient().listShareGroupOffsets(groupSpecs, options);
+
+            final MockClient mockClient = env.kafkaClient();
+            waitForRequest(mockClient, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+
+            ClientRequest clientRequest = mockClient.requests().peek();
+            assertNotNull(clientRequest);
+            DescribeShareGroupOffsetsRequestData data = 
((DescribeShareGroupOffsetsRequest.Builder) 
clientRequest.requestBuilder()).build().data();
+            assertEquals(GROUP_ID, data.groupId());
+            assertEquals(Collections.singletonList("A"),
+                
data.topics().stream().map(DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList()));
+        } catch (Exception e) {
+            fail(e);
+        }

Review Comment:
   Why do you need the catch block? If any exception is thrown in the method 
then will it not fail the test anyways? Change the method signature to `public 
void testListShareGroupOffsetsOptionsWithBatchedApi() throws Exception {`



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java:
##########
@@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() {
             return new 
ReadShareGroupStateSummaryParameters(groupTopicPartitionData);
         }
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        ReadShareGroupStateSummaryParameters that = 
(ReadShareGroupStateSummaryParameters) o;
+        return Objects.equals(groupTopicPartitionData, 
that.groupTopicPartitionData);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(groupTopicPartitionData);
+    }

Review Comment:
   Seems you missed it.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -2409,4 +2191,323 @@ public void 
testShareGroupDescribeCoordinatorNotActive() throws ExecutionExcepti
             future.get()
         );
     }
+
+    @Test
+    public void testDescribeShareGroupOffsetsWithNoOpPersister() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DescribeShareGroupOffsetsResponseData responseData = new 
DescribeShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+                        .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
+                        
.setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE)))
+                )
+            );
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsWithDefaultPersister() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                        .setPartition(partition)))));
+
+        DescribeShareGroupOffsetsResponseData responseData = new 
DescribeShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setStartOffset(21)
+                        .setErrorCode(Errors.NONE.code())
+                        .setErrorMessage(Errors.NONE.message())))
+                )
+            );
+
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setStartOffset(21)
+                        .setStateEpoch(1)
+                        .setErrorCode(Errors.NONE.code())
+                        .setErrorMessage(Errors.NONE.message())))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+            
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsWithDefaultPersisterThrowsError() 
{
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        when(persister.readSummary(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.failedFuture(new Exception("Unable 
to validate read state summary request")));
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(future, Exception.class, "Unable to validate read 
state summary request");
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsWithDefaultPersisterNullResult() {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        when(persister.readSummary(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.completedFuture(null));
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(future, IllegalStateException.class, "Result is 
null for the read state summary");
+    }
+
+    @Test
+    public void 
testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+        service.startup(() -> 1);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
+            new 
ReadShareGroupStateSummaryResult.Builder().setTopicsData(null).build();
+
+        when(persister.readSummary(ArgumentMatchers.any()))
+            
.thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(future, IllegalStateException.class, "Result is 
null for the read state summary");
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build();
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DescribeShareGroupOffsetsResponseData responseData = new 
DescribeShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setStartOffset(0)
+                        .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+                        
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message())))
+                )
+            );
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupOffsetsMetadataImageNull() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build(true);
+
+        // Forcing a null Metadata Image
+        service.onNewMetadataImage(null, null);
+
+        int partition = 1;
+        DescribeShareGroupOffsetsRequestData requestData = new 
DescribeShareGroupOffsetsRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+                .setTopicName(TOPIC_NAME)
+                .setPartitions(List.of(partition))
+            ));
+
+        DescribeShareGroupOffsetsResponseData responseData = new 
DescribeShareGroupOffsetsResponseData()
+            .setResponses(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setStartOffset(0)
+                        .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+                        
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
+                )
+            );
+
+        CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+            
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @FunctionalInterface
+    interface TriFunction<A, B, C, R> {
+        R apply(A a, B b, C c);
+    }
+
+    public static class GroupCoordinatorServiceBuilder {

Review Comment:
   ```suggestion
       private static class GroupCoordinatorServiceBuilder {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to