sjhajharia commented on code in PR #18671:
URL: https://github.com/apache/kafka/pull/18671#discussion_r1931659169


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##########
@@ -256,6 +258,18 @@ 
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetchAllOffs
         boolean requireStable
     );
 
+    /**
+     * Fetch the Share Group Offsets for a given group.
+     *
+     * @param context The request context
+     * @param request The DescribeShareGroupOffsets request.
+     * @return A future yielding the results.
+     */
+    CompletableFuture<ReadShareGroupStateSummaryResponseData> 
describeShareGroupOffsets(

Review Comment:
   Even I am not the biggest fan of the same. The only reason this needs to be 
done is because the `DescribeShareGroupOffsetsRequest/ResponseData` has the 
topicName in them while the `ReadShareGroupStateSummaryRequest/ResponseData` 
needs the topicId in it. 
   I could use the topicNametoId and topicIdtoName maps which are present in 
KafkaApis for the same conversion. The alternative here would be to pass these 
two maps to the GroupCoordinator as well, but that would deviate from the way 
all other GC methods are written.
   If there is a way the GC can directly have an access to these mappings, it 
can be done. Do you think I should pass those two maps to the GC as well?



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java:
##########
@@ -37,7 +37,7 @@ public class ListShareGroupOffsetsResult {
 
     private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
 
-    ListShareGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, Long>>> futures) {
+    public ListShareGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, Long>>> futures) {

Review Comment:
   We would need a public access for the same as the return type for the 
`Admin.listShareGroupOffsets()` is ListShareGroupOffsetsResult. Thus, in the 
`ShareGroupCommandTest`, when we mock the behaviour of AdminClient, we would 
need to create an instance of `ListShareGroupOffsetsResult`. Pls see 
`ShareGroupCommandTest` for more details on usage.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3185,9 +3187,87 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): 
Unit = {
     val describeShareGroupOffsetsRequest = 
request.body[DescribeShareGroupOffsetsRequest]
-    // TODO: Implement the DescribeShareGroupOffsetsRequest handling
-    requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
describeShareGroupOffsetsRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val topicNamesToIds = metadataCache.topicNamesToIds()
+      val topicIdToNames = metadataCache.topicIdsToNames()
+
+      val readStateSummaryData = 
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(
+        describeShareGroupOffsetsRequest.data(),
+        topicNamesToIds
+      )
+      groupCoordinator.describeShareGroupOffsets(
+        request.context,
+        readStateSummaryData,
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(
+            request,
+            new DescribeShareGroupOffsetsResponse(
+              
getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(response,
 topicIdToNames)
+            )
+          )
+        }
+      }
+    }
+  }
+
+  private def 
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(describeShareGroupOffsetsRequestData:
 DescribeShareGroupOffsetsRequestData,
+                                                                               
        topicNamesId: util.Map[String, Uuid]
+                                                                               
       ): ReadShareGroupStateSummaryRequestData = {
+    val readStateSummaryTopics = 
describeShareGroupOffsetsRequestData.topics.asScala.map(
+      topic => {
+        val partitions = topic.partitions.asScala.map(
+          partitionIndex => {
+            new PartitionData()
+              .setPartition(partitionIndex)
+              .setLeaderEpoch(0)
+          }
+        ).asJava
+        new ReadStateSummaryData()
+          .setTopicId(topicNamesId.get(topic.topicName()))
+          .setPartitions(partitions)
+      }
+    ).asJava

Review Comment:
   They seems to be necessary. Getting rid of them causes type mismatches,



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3185,9 +3187,87 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): 
Unit = {
     val describeShareGroupOffsetsRequest = 
request.body[DescribeShareGroupOffsetsRequest]
-    // TODO: Implement the DescribeShareGroupOffsetsRequest handling
-    requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
describeShareGroupOffsetsRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val topicNamesToIds = metadataCache.topicNamesToIds()
+      val topicIdToNames = metadataCache.topicIdsToNames()
+
+      val readStateSummaryData = 
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(
+        describeShareGroupOffsetsRequest.data(),
+        topicNamesToIds
+      )
+      groupCoordinator.describeShareGroupOffsets(
+        request.context,
+        readStateSummaryData,
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(
+            request,
+            new DescribeShareGroupOffsetsResponse(
+              
getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(response,
 topicIdToNames)
+            )
+          )
+        }
+      }
+    }
+  }
+
+  private def 
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(describeShareGroupOffsetsRequestData:
 DescribeShareGroupOffsetsRequestData,
+                                                                               
        topicNamesId: util.Map[String, Uuid]
+                                                                               
       ): ReadShareGroupStateSummaryRequestData = {
+    val readStateSummaryTopics = 
describeShareGroupOffsetsRequestData.topics.asScala.map(
+      topic => {
+        val partitions = topic.partitions.asScala.map(
+          partitionIndex => {
+            new PartitionData()
+              .setPartition(partitionIndex)
+              .setLeaderEpoch(0)
+          }
+        ).asJava
+        new ReadStateSummaryData()
+          .setTopicId(topicNamesId.get(topic.topicName()))
+          .setPartitions(partitions)
+      }
+    ).asJava
+
+    val result = new ReadShareGroupStateSummaryRequestData()
+      .setGroupId(describeShareGroupOffsetsRequestData.groupId())
+      .setTopics(readStateSummaryTopics)
+    result
+  }
+
+  private def 
getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(readShareGroupStateSummaryResponseData:
 ReadShareGroupStateSummaryResponseData,
+                                                                               
          topicIdNames: util.Map[Uuid, String]
+                                                                               
         ): DescribeShareGroupOffsetsResponseData = {
+    val describeShareGroupOffsetsResponseData = 
readShareGroupStateSummaryResponseData.results().asScala.map(
+      readStateSummaryResult => {
+        val partitions = readStateSummaryResult.partitions().asScala.map(
+          partitionResult => {
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(partitionResult.partition())
+              .setStartOffset(partitionResult.startOffset())
+              .setLeaderEpoch(partitionResult.stateEpoch())
+              .setErrorCode(partitionResult.errorCode())
+              .setErrorMessage(partitionResult.errorMessage())
+          }
+        ).asJava
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicId(readStateSummaryResult.topicId())
+          .setTopicName(topicIdNames.get(readStateSummaryResult.topicId()))
+          .setPartitions(partitions)
+      }
+    ).asJava

Review Comment:
   Same ^



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java:
##########
@@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() {
             return new 
ReadShareGroupStateSummaryParameters(groupTopicPartitionData);
         }
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        ReadShareGroupStateSummaryParameters that = 
(ReadShareGroupStateSummaryParameters) o;
+        return Objects.equals(groupTopicPartitionData, 
that.groupTopicPartitionData);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(groupTopicPartitionData);
+    }

Review Comment:
   They are required for the `GroupCoordinatorServiceTest. 
testDescribeShareGroupOffsetsWithDefaultPersister`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to