sjhajharia commented on code in PR #18671: URL: https://github.com/apache/kafka/pull/18671#discussion_r1931659169
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ########## @@ -256,6 +258,18 @@ CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffs boolean requireStable ); + /** + * Fetch the Share Group Offsets for a given group. + * + * @param context The request context + * @param request The DescribeShareGroupOffsets request. + * @return A future yielding the results. + */ + CompletableFuture<ReadShareGroupStateSummaryResponseData> describeShareGroupOffsets( Review Comment: Even I am not the biggest fan of the same. The only reason this needs to be done is because the `DescribeShareGroupOffsetsRequest/ResponseData` has the topicName in them while the `ReadShareGroupStateSummaryRequest/ResponseData` needs the topicId in it. I could use the topicNametoId and topicIdtoName maps which are present in KafkaApis for the same conversion. The alternative here would be to pass these two maps to the GroupCoordinator as well, but that would deviate from the way all other GC methods are written. If there is a way the GC can directly have an access to these mappings, it can be done. Do you think I should pass those two maps to the GC as well? ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java: ########## @@ -37,7 +37,7 @@ public class ListShareGroupOffsetsResult { private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures; - ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> futures) { + public ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> futures) { Review Comment: We would need a public access for the same as the return type for the `Admin.listShareGroupOffsets()` is ListShareGroupOffsetsResult. Thus, in the `ShareGroupCommandTest`, when we mock the behaviour of AdminClient, we would need to create an instance of `ListShareGroupOffsetsResult`. Pls see `ShareGroupCommandTest` for more details on usage. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3185,9 +3187,87 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest] - // TODO: Implement the DescribeShareGroupOffsetsRequest handling - requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, describeShareGroupOffsetsRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val topicNamesToIds = metadataCache.topicNamesToIds() + val topicIdToNames = metadataCache.topicIdsToNames() + + val readStateSummaryData = getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest( + describeShareGroupOffsetsRequest.data(), + topicNamesToIds + ) + groupCoordinator.describeShareGroupOffsets( + request.context, + readStateSummaryData, + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle( + request, + new DescribeShareGroupOffsetsResponse( + getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(response, topicIdToNames) + ) + ) + } + } + } + } + + private def getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(describeShareGroupOffsetsRequestData: DescribeShareGroupOffsetsRequestData, + topicNamesId: util.Map[String, Uuid] + ): ReadShareGroupStateSummaryRequestData = { + val readStateSummaryTopics = describeShareGroupOffsetsRequestData.topics.asScala.map( + topic => { + val partitions = topic.partitions.asScala.map( + partitionIndex => { + new PartitionData() + .setPartition(partitionIndex) + .setLeaderEpoch(0) + } + ).asJava + new ReadStateSummaryData() + .setTopicId(topicNamesId.get(topic.topicName())) + .setPartitions(partitions) + } + ).asJava Review Comment: They seems to be necessary. Getting rid of them causes type mismatches, ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3185,9 +3187,87 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): Unit = { val describeShareGroupOffsetsRequest = request.body[DescribeShareGroupOffsetsRequest] - // TODO: Implement the DescribeShareGroupOffsetsRequest handling - requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, describeShareGroupOffsetsRequest.data.groupId)) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val topicNamesToIds = metadataCache.topicNamesToIds() + val topicIdToNames = metadataCache.topicIdsToNames() + + val readStateSummaryData = getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest( + describeShareGroupOffsetsRequest.data(), + topicNamesToIds + ) + groupCoordinator.describeShareGroupOffsets( + request.context, + readStateSummaryData, + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, describeShareGroupOffsetsRequest.getErrorResponse(exception)) + } else { + requestHelper.sendMaybeThrottle( + request, + new DescribeShareGroupOffsetsResponse( + getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(response, topicIdToNames) + ) + ) + } + } + } + } + + private def getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(describeShareGroupOffsetsRequestData: DescribeShareGroupOffsetsRequestData, + topicNamesId: util.Map[String, Uuid] + ): ReadShareGroupStateSummaryRequestData = { + val readStateSummaryTopics = describeShareGroupOffsetsRequestData.topics.asScala.map( + topic => { + val partitions = topic.partitions.asScala.map( + partitionIndex => { + new PartitionData() + .setPartition(partitionIndex) + .setLeaderEpoch(0) + } + ).asJava + new ReadStateSummaryData() + .setTopicId(topicNamesId.get(topic.topicName())) + .setPartitions(partitions) + } + ).asJava + + val result = new ReadShareGroupStateSummaryRequestData() + .setGroupId(describeShareGroupOffsetsRequestData.groupId()) + .setTopics(readStateSummaryTopics) + result + } + + private def getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(readShareGroupStateSummaryResponseData: ReadShareGroupStateSummaryResponseData, + topicIdNames: util.Map[Uuid, String] + ): DescribeShareGroupOffsetsResponseData = { + val describeShareGroupOffsetsResponseData = readShareGroupStateSummaryResponseData.results().asScala.map( + readStateSummaryResult => { + val partitions = readStateSummaryResult.partitions().asScala.map( + partitionResult => { + new DescribeShareGroupOffsetsResponsePartition() + .setPartitionIndex(partitionResult.partition()) + .setStartOffset(partitionResult.startOffset()) + .setLeaderEpoch(partitionResult.stateEpoch()) + .setErrorCode(partitionResult.errorCode()) + .setErrorMessage(partitionResult.errorMessage()) + } + ).asJava + new DescribeShareGroupOffsetsResponseTopic() + .setTopicId(readStateSummaryResult.topicId()) + .setTopicName(topicIdNames.get(readStateSummaryResult.topicId())) + .setPartitions(partitions) + } + ).asJava Review Comment: Same ^ ########## server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java: ########## @@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() { return new ReadShareGroupStateSummaryParameters(groupTopicPartitionData); } } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ReadShareGroupStateSummaryParameters that = (ReadShareGroupStateSummaryParameters) o; + return Objects.equals(groupTopicPartitionData, that.groupTopicPartitionData); + } + + @Override + public int hashCode() { + return Objects.hashCode(groupTopicPartitionData); + } Review Comment: They are required for the `GroupCoordinatorServiceTest. testDescribeShareGroupOffsetsWithDefaultPersister` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org