AndrewJSchofield commented on code in PR #19815:
URL: https://github.com/apache/kafka/pull/19815#discussion_r2113705265


##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java:
##########
@@ -451,15 +451,15 @@ public void testWriteStateFencedLeaderEpochError() {
                         .setFirstOffset(11)
                         .setLastOffset(20)
                         .setDeliveryCount((short) 1)
-                        .setDeliveryState((byte) 0)))))));
+                                .setDeliveryState((byte) 
0)))).iterator()))).iterator()));

Review Comment:
   nit: indentation



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -714,9 +714,11 @@ public 
CoordinatorResult<DeleteShareGroupOffsetsResultHolder, CoordinatorRecord>
                 );
             }
 
+            DeleteShareGroupStateRequestData.DeleteStateDataCollection 
topicCollection = new 
DeleteShareGroupStateRequestData.DeleteStateDataCollection();
+            deleteShareGroupStateRequestTopicsData.forEach(d -> 
topicCollection.add(d.duplicate()));

Review Comment:
   Is this duplication really necessary?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -807,9 +815,10 @@ public 
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestC
 
                 DeleteShareGroupStateRequestData requestForCurrentPartition = 
new DeleteShareGroupStateRequestData()
                     .setGroupId(groupId)
-                    .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                    .setTopics(new 
DeleteShareGroupStateRequestData.DeleteStateDataCollection(
+                        List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
                         .setTopicId(topicId)
-                        .setPartitions(List.of(partitionData))));
+                        .setPartitions(new 
DeleteShareGroupStateRequestData.PartitionDataCollection(List.of(partitionData.duplicate()).iterator()))).iterator()));

Review Comment:
   I'm trying to decide whether this is really worth making so many changes to 
`mapKey`. There's a lot of added complexity in situations like this. In test 
cases, it's not really that important. In the main code paths, it's more of a 
concern. wdyt?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3812,7 +3814,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message())
         )
       } else {
-        authorizedTopics.add(topic)
+        authorizedTopics.add(topic.duplicate())

Review Comment:
   And here.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3696,14 +3696,16 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestContext,
       DESCRIBE,
       TOPIC,
-      groupDescribeOffsetsRequest.topics.asScala
+      groupDescribeOffsetsRequest.topics.valuesList.asScala
     )(_.topicName)
 
+    val topicCollection = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopicCollection()
+    authorizedTopics.foreach(t => topicCollection.add(t.duplicate))

Review Comment:
   I don't see why we now need duplication where previously we did not. Could 
you explain? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to