apoorvmittal10 commented on code in PR #18671:
URL: https://github.com/apache/kafka/pull/18671#discussion_r1931902345


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -3796,12 +3797,14 @@ public DescribeShareGroupsResult 
describeShareGroups(final Collection<String> gr
                 .collect(Collectors.toMap(entry -> entry.getKey().idValue, 
Map.Entry::getValue)));
     }
 
-    // To do in a follow-up PR
     @Override
     public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, 
ListShareGroupOffsetsSpec> groupSpecs,
                                                              final 
ListShareGroupOffsetsOptions options) {
-        // To-do
-        throw new InvalidRequestException("The method is not yet implemented");
+        SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> future 
=
+            ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());

Review Comment:
   nit: merge in single line.



##########
server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java:
##########
@@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() {
             return new 
ReadShareGroupStateSummaryParameters(groupTopicPartitionData);
         }
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) return false;
+        ReadShareGroupStateSummaryParameters that = 
(ReadShareGroupStateSummaryParameters) o;
+        return Objects.equals(groupTopicPartitionData, 
that.groupTopicPartitionData);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(groupTopicPartitionData);
+    }

Review Comment:
   Do you mean usage in `assertions`? If yes, then I am sure there could be a 
better way to assert rather having these. Though nit so leave it on you.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3185,9 +3187,87 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request): 
Unit = {
     val describeShareGroupOffsetsRequest = 
request.body[DescribeShareGroupOffsetsRequest]
-    // TODO: Implement the DescribeShareGroupOffsetsRequest handling
-    requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
describeShareGroupOffsetsRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val topicNamesToIds = metadataCache.topicNamesToIds()
+      val topicIdToNames = metadataCache.topicIdsToNames()
+
+      val readStateSummaryData = 
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(
+        describeShareGroupOffsetsRequest.data(),
+        topicNamesToIds
+      )
+      groupCoordinator.describeShareGroupOffsets(
+        request.context,
+        readStateSummaryData,
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
describeShareGroupOffsetsRequest.getErrorResponse(exception))
+        } else {
+          requestHelper.sendMaybeThrottle(
+            request,
+            new DescribeShareGroupOffsetsResponse(
+              
getDescribeShareGroupOffsetsResponseFromReadShareGroupStateSummaryResponse(response,
 topicIdToNames)
+            )
+          )
+        }
+      }
+    }
+  }
+
+  private def 
getReadShareGroupStateSummaryRequestFromDescribeShareGroupOffsetsRequest(describeShareGroupOffsetsRequestData:
 DescribeShareGroupOffsetsRequestData,
+                                                                               
        topicNamesId: util.Map[String, Uuid]
+                                                                               
       ): ReadShareGroupStateSummaryRequestData = {
+    val readStateSummaryTopics = 
describeShareGroupOffsetsRequestData.topics.asScala.map(
+      topic => {
+        val partitions = topic.partitions.asScala.map(
+          partitionIndex => {
+            new PartitionData()
+              .setPartition(partitionIndex)
+              .setLeaderEpoch(0)
+          }
+        ).asJava
+        new ReadStateSummaryData()
+          .setTopicId(topicNamesId.get(topic.topicName()))
+          .setPartitions(partitions)
+      }
+    ).asJava

Review Comment:
   Your IDE might be wrong, but the build will be fine :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to