apoorvmittal10 commented on code in PR #18671: URL: https://github.com/apache/kafka/pull/18671#discussion_r1931902345
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -3796,12 +3797,14 @@ public DescribeShareGroupsResult describeShareGroups(final Collection<String> gr .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } - // To do in a follow-up PR @Override public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs, final ListShareGroupOffsetsOptions options) { - // To-do - throw new InvalidRequestException("The method is not yet implemented"); + SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> future = + ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); Review Comment: nit: merge in single line. ########## server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java: ########## @@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() { return new ReadShareGroupStateSummaryParameters(groupTopicPartitionData); } } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ReadShareGroupStateSummaryParameters that = (ReadShareGroupStateSummaryParameters) o; + return Objects.equals(groupTopicPartitionData, that.groupTopicPartitionData); + } + + @Override + public int hashCode() { + return Objects.hashCode(groupTopicPartitionData); + } Review Comment: Do you mean usage in `assertions`? If yes, then I am sure there could be a better way to assert rather having these. Though nit so leave it on you. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3185,9 +3187,87 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest] - // TODO: Implement the DescribeShareGroupOffsetsRequest handling - requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, describeShareGroupOffsetsRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val topicNamesToIds = metadataCache.topicNamesToIds() + val topicIdToNames = metadataCache.topicIdsToNames() + + val readStateSummaryData = getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest( + describeShareGroupOffsetsRequest.data(), + topicNamesToIds + ) + groupCoordinator.describeShareGroupOffsets( + request.context, + readStateSummaryData, + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle( + request, + new DescribeShareGroupOffsetsResponse( + getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(response, topicIdToNames) + ) + ) + } + } + } + } + + private def getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(describeShareGroupOffsetsRequestData: DescribeShareGroupOffsetsRequestData, + topicNamesId: util.Map[String, Uuid] + ): ReadShareGroupStateSummaryRequestData = { + val readStateSummaryTopics = describeShareGroupOffsetsRequestData.topics.asScala.map( + topic => { + val partitions = topic.partitions.asScala.map( + partitionIndex => { + new PartitionData() + .setPartition(partitionIndex) + .setLeaderEpoch(0) + } + ).asJava + new ReadStateSummaryData() + .setTopicId(topicNamesId.get(topic.topicName())) + .setPartitions(partitions) + } + ).asJava Review Comment: Your IDE might be wrong, but the build will be fine :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org