apoorvmittal10 commented on code in PR #18671: URL: https://github.com/apache/kafka/pull/18671#discussion_r1935632909
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -10441,6 +10443,125 @@ class KafkaApisTest extends Logging { }) } + @Test + def testDescribeShareGroupOffsetsReturnsUnsupportedVersion(): Unit = { + val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics( + util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))) + ) + + val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build()) + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + kafkaApis = createKafkaApis() + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val response = verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest) + response.data.responses.forEach(topic => topic.partitions().forEach(partition => assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode()))) + } + + @Test + def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = { + val describeShareGroupOffsetsRequest = new DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics( + util.List.of(new DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1))) + ) + + val requestChannelRequest = buildRequest(new DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, true).build()) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) Review Comment: Can we avoid conversions? ```suggestion .thenReturn(util.List.of(AuthorizationResult.ALLOWED)) ``` ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java: ########## @@ -212,17 +216,35 @@ public void testDescribeOffsetsOfAllExistingGroups() throws Exception { ), 0)), GroupState.STABLE, new Node(0, "host1", 9090), 0, 0); - ListOffsetsResult resultOffsets = new ListOffsetsResult( + ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( - new TopicPartition("topic1", 0), - KafkaFuture.completedFuture(new ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty())) - )); + firstGroup, + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L)) + ) + ); + ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = AdminClientTestUtils.createListShareGroupOffsetsResult( + Map.of( + secondGroup, + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), 0L)) + ) + ); when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing, secondGroupListing))); when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult); when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1), secondGroup, KafkaFuture.completedFuture(exp2))); when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); - when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets); + when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Map<String, Object> argument = invocation.getArgument(0); + if (argument.containsKey(firstGroup)) { + return listShareGroupOffsetsResult1; + } else if (argument.containsKey(secondGroup)) { + return listShareGroupOffsetsResult2; + } + return null; + } + }); Review Comment: ```suggestion when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer( invocation -> { Map<String, Object> argument = invocation.getArgument(0); if (argument.containsKey(firstGroup)) { return listShareGroupOffsetsResult1; } else if (argument.containsKey(secondGroup)) { return listShareGroupOffsetsResult2; } return null; }); ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -2409,4 +2191,323 @@ public void testShareGroupDescribeCoordinatorNotActive() throws ExecutionExcepti future.get() ); } + + @Test + public void testDescribeShareGroupOffsetsWithNoOpPersister() throws InterruptedException, ExecutionException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() + .setResponses( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET) + .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE) + .setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE))) + ) + ); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersister() throws InterruptedException, ExecutionException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + ReadShareGroupStateSummaryRequestData readShareGroupStateSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition))))); + + DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() + .setResponses( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(21) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()))) + ) + ); + + ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() + .setResults( + List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setStartOffset(21) + .setStateEpoch(1) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()))) + ) + ); + + ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters = ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData); + ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData); + when(persister.readSummary( + ArgumentMatchers.eq(readShareGroupStateSummaryParameters) + )).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersisterThrowsError() { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + when(persister.readSummary(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate read state summary request"))); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + assertFutureThrows(future, Exception.class, "Unable to validate read state summary request"); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersisterNullResult() { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + when(persister.readSummary(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary"); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = + new ReadShareGroupStateSummaryResult.Builder().setTopicsData(null).build(); + + when(persister.readSummary(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary"); + } + + @Test + public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() + .setResponses( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(0) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message()))) + ) + ); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDescribeShareGroupOffsetsMetadataImageNull() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(true); + + // Forcing a null Metadata Image + service.onNewMetadataImage(null, null); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() + .setResponses( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))) + ) + ); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @FunctionalInterface + interface TriFunction<A, B, C, R> { Review Comment: ```suggestion private interface TriFunction<A, B, C, R> { ``` ########## clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest; +import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call + */ +public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> { + + private final Map<String, ListShareGroupOffsetsSpec> groupSpecs; + private final Logger log; + private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy; + + public ListShareGroupOffsetsHandler( + Map<String, ListShareGroupOffsetsSpec> groupSpecs, + LogContext logContext) { + this.groupSpecs = groupSpecs; + this.log = logContext.logger(ListShareGroupOffsetsHandler.class); + this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + } + + public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> newFuture(Collection<String> groupIds) { + return AdminApiFuture.forKeys(coordinatorKeys(groupIds)); + } + + @Override + public String apiName() { + return "describeShareGroupOffsets"; + } + + @Override + public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() { + return lookupStrategy; + } + + @Override + public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> keys) { + List<String> groupIds = keys.stream().map(key -> { + if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) { + throw new IllegalArgumentException("Invalid group coordinator key " + key + + " when building `DescribeShareGroupOffsets` request"); + } + return key.idValue; + }).collect(Collectors.toList()); + // The DescribeShareGroupOffsetsRequest only includes a single group ID at this point, which is likely a mistake to be fixing a follow-on PR. + String groupId = groupIds.isEmpty() ? null : groupIds.get(0); + if (groupId == null) { + throw new IllegalArgumentException("Missing group id in request"); + } + ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId); + List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic> topics = + spec.topicPartitions().stream().map( + topicPartition -> new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(topicPartition.topic()) + .setPartitions(List.of(topicPartition.partition())) + ).collect(Collectors.toList()); + DescribeShareGroupOffsetsRequestData data = new DescribeShareGroupOffsetsRequestData() + .setGroupId(groupId) + .setTopics(topics); + return new DescribeShareGroupOffsetsRequest.Builder(data, true); + } + + @Override + public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> handleResponse(Node coordinator, + Set<CoordinatorKey> groupIds, + AbstractResponse abstractResponse) { + final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse; + final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new HashMap<>(); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + + for (CoordinatorKey groupId : groupIds) { + Map<TopicPartition, Long> data = new HashMap<>(); + response.data().responses().stream().map( + describedTopic -> + describedTopic.partitions().stream().map( + partition -> + data.put(new TopicPartition(describedTopic.topicName(), partition.partitionIndex()), partition.startOffset()) + ).collect(Collectors.toList()) + ).collect(Collectors.toList()); + completed.put(groupId, data); + } + return new ApiResult<>(completed, failed, new ArrayList<>()); Review Comment: ```suggestion return new ApiResult<>(completed, failed, Collections.emptyList()); ``` ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -8970,4 +8975,85 @@ public void testRemoveRaftVoterRequest(boolean fail, boolean sendClusterId) thro assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"), requestData.get().voterDirectoryId()); } } + + @Test + public void testListShareGroupOffsetsOptionsWithBatchedApi() { + final Cluster cluster = mockCluster(3, 0); + final Time time = new MockTime(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRIES_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + final List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("A", 0)); + final ListShareGroupOffsetsOptions options = new ListShareGroupOffsetsOptions(); + + final ListShareGroupOffsetsSpec groupSpec = new ListShareGroupOffsetsSpec() + .topicPartitions(partitions); + Map<String, ListShareGroupOffsetsSpec> groupSpecs = new HashMap<>(); + groupSpecs.put(GROUP_ID, groupSpec); + + env.adminClient().listShareGroupOffsets(groupSpecs, options); + + final MockClient mockClient = env.kafkaClient(); + waitForRequest(mockClient, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS); + + ClientRequest clientRequest = mockClient.requests().peek(); + assertNotNull(clientRequest); + DescribeShareGroupOffsetsRequestData data = ((DescribeShareGroupOffsetsRequest.Builder) clientRequest.requestBuilder()).build().data(); + assertEquals(GROUP_ID, data.groupId()); + assertEquals(Collections.singletonList("A"), + data.topics().stream().map(DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList())); + } catch (Exception e) { + fail(e); + } Review Comment: Why do you need the catch block? If any exception is thrown in the method then will it not fail the test anyways? Change the method signature to `public void testListShareGroupOffsetsOptionsWithBatchedApi() throws Exception {` ########## server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java: ########## @@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() { return new ReadShareGroupStateSummaryParameters(groupTopicPartitionData); } } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ReadShareGroupStateSummaryParameters that = (ReadShareGroupStateSummaryParameters) o; + return Objects.equals(groupTopicPartitionData, that.groupTopicPartitionData); + } + + @Override + public int hashCode() { + return Objects.hashCode(groupTopicPartitionData); + } Review Comment: Seems you missed it. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ########## @@ -2409,4 +2191,323 @@ public void testShareGroupDescribeCoordinatorNotActive() throws ExecutionExcepti future.get() ); } + + @Test + public void testDescribeShareGroupOffsetsWithNoOpPersister() throws InterruptedException, ExecutionException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() + .setResponses( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET) + .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE) + .setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE))) + ) + ); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersister() throws InterruptedException, ExecutionException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + ReadShareGroupStateSummaryRequestData readShareGroupStateSummaryRequestData = new ReadShareGroupStateSummaryRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryRequestData.PartitionData() + .setPartition(partition))))); + + DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() + .setResponses( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(21) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()))) + ) + ); + + ReadShareGroupStateSummaryResponseData readShareGroupStateSummaryResponseData = new ReadShareGroupStateSummaryResponseData() + .setResults( + List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setStartOffset(21) + .setStateEpoch(1) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message()))) + ) + ); + + ReadShareGroupStateSummaryParameters readShareGroupStateSummaryParameters = ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData); + ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData); + when(persister.readSummary( + ArgumentMatchers.eq(readShareGroupStateSummaryParameters) + )).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersisterThrowsError() { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + when(persister.readSummary(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.failedFuture(new Exception("Unable to validate read state summary request"))); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + assertFutureThrows(future, Exception.class, "Unable to validate read state summary request"); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersisterNullResult() { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + when(persister.readSummary(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.completedFuture(null)); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary"); + } + + @Test + public void testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + Persister persister = mock(DefaultStatePersister.class); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .setPersister(persister) + .build(true); + service.startup(() -> 1); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = + new ReadShareGroupStateSummaryResult.Builder().setTopicsData(null).build(); + + when(persister.readSummary(ArgumentMatchers.any())) + .thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult)); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + assertFutureThrows(future, IllegalStateException.class, "Result is null for the read state summary"); + } + + @Test + public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() + .setResponses( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(0) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message()))) + ) + ); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @Test + public void testDescribeShareGroupOffsetsMetadataImageNull() throws ExecutionException, InterruptedException { + CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorServiceBuilder() + .setConfig(createConfig()) + .setRuntime(runtime) + .build(true); + + // Forcing a null Metadata Image + service.onNewMetadataImage(null, null); + + int partition = 1; + DescribeShareGroupOffsetsRequestData requestData = new DescribeShareGroupOffsetsRequestData() + .setGroupId("share-group-id") + .setTopics(List.of(new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(partition)) + )); + + DescribeShareGroupOffsetsResponseData responseData = new DescribeShareGroupOffsetsResponseData() + .setResponses( + List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic() + .setTopicName(TOPIC_NAME) + .setPartitions(List.of(new DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setStartOffset(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))) + ) + ); + + CompletableFuture<DescribeShareGroupOffsetsResponseData> future = + service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS), requestData); + + assertEquals(responseData, future.get()); + } + + @FunctionalInterface + interface TriFunction<A, B, C, R> { + R apply(A a, B b, C c); + } + + public static class GroupCoordinatorServiceBuilder { Review Comment: ```suggestion private static class GroupCoordinatorServiceBuilder { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org