chia7712 commented on code in PR #19167:
URL: https://github.com/apache/kafka/pull/19167#discussion_r2024083530


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java:
##########
@@ -392,13 +393,9 @@ private ShareFetchResponse 
shareFetchResponse(TopicIdPartition tip, int count) {
             .setPartitionIndex(tip.partition())
             .setRecords(records)
             .setAcquiredRecords(List.of(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(count 
- 1).setDeliveryCount((short) 1)));
-        ShareFetchResponseData.ShareFetchableTopicResponse topicResponse = new 
ShareFetchResponseData.ShareFetchableTopicResponse()
-            .setTopicId(tip.topicId())
-            .setPartitions(List.of(partData));
-        return new ShareFetchResponse(
-            new ShareFetchResponseData()
-                .setResponses(List.of(topicResponse))
-        );
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
topicIdPartitionToPartition = new LinkedHashMap<>();
+        topicIdPartitionToPartition.put(tip, partData);
+        return ShareFetchResponse.of(Errors.NONE, 0, 
topicIdPartitionToPartition, List.of(), 0);

Review Comment:
   ```java
   return ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(Map.of(tip, 
partData)), List.of(), 0);
   ```



##########
clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java:
##########
@@ -172,6 +186,11 @@ public static ShareFetchResponseData toMessage(Errors 
error, int throttleTimeMs,
             ShareFetchResponseData.PartitionData partitionData = 
entry.getValue();
             // Since PartitionData alone doesn't know the partition ID, we set 
it here
             
partitionData.setPartitionIndex(entry.getKey().topicPartition().partition());
+            // To protect the clients from failing due to null records,

Review Comment:
   Could you please change this method `toMessage` to private?



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -360,7 +360,7 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
                         .setPartitions(partitionResponses));
             });
         }
-        return new FetchResponse(new FetchResponseData()
+        return FetchResponse.of(new FetchResponseData()

Review Comment:
   For versions prior to 13, the data is iterated twice. To prevent this 
redundancy, consider making the `FetchResponse` constructor package-private to 
avoid the double loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to