chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r584221036



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -365,17 +123,70 @@ public int sessionId() {
      * @param partIterator  The partition iterator.
      * @return              The response size in bytes.
      */
-    public static <T extends BaseRecords> int sizeOf(short version,
-                                                     
Iterator<Map.Entry<TopicPartition, PartitionData<T>>> partIterator) {
+    public static int sizeOf(short version,
+                             Iterator<Map.Entry<TopicPartition, 
FetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        FetchResponseData data = toMessage(0, Errors.NONE, partIterator, 
INVALID_SESSION_ID);
+        LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> data = 
new LinkedHashMap<>();
+        partIterator.forEachRemaining(entry -> data.put(entry.getKey(), 
entry.getValue()));
         ObjectSerializationCache cache = new ObjectSerializationCache();
-        return 4 + data.size(cache, version);
+        return 4 + FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, 
data).data.size(cache, version);
     }
 
     @Override
     public boolean shouldClientThrottle(short version) {
         return version >= 8;
     }
-}
+
+    public static Optional<FetchResponseData.EpochEndOffset> 
divergingEpoch(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() < 0 ? 
Optional.empty()
+                : Optional.of(partitionResponse.divergingEpoch());
+    }
+
+    public static boolean isDivergingEpoch(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.divergingEpoch().epoch() >= 0;
+    }
+
+    public static Optional<Integer> 
preferredReadReplica(FetchResponseData.PartitionData partitionResponse) {
+        return partitionResponse.preferredReadReplica() == 
INVALID_PREFERRED_REPLICA_ID ? Optional.empty()
+                : Optional.of(partitionResponse.preferredReadReplica());
+    }
+
+    public static boolean isPreferredReplica(FetchResponseData.PartitionData 
partitionResponse) {
+        return partitionResponse.preferredReadReplica() != 
INVALID_PREFERRED_REPLICA_ID;
+    }
+
+    public static FetchResponseData.PartitionData partitionResponse(int 
partition, Errors error) {
+        return new FetchResponseData.PartitionData()
+                .setPartitionIndex(partition)
+                .setErrorCode(error.code())
+                .setHighWatermark(FetchResponse.INVALID_HIGH_WATERMARK)
+                .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
+                .setLogStartOffset(FetchResponse.INVALID_LOG_START_OFFSET)
+                .setAbortedTransactions(null)
+                .setRecords(MemoryRecords.EMPTY);
+    }
+
+    /**
+     * cast the BaseRecords of PartitionData to Records. This is used to 
eliminate duplicate code of type casting.

Review comment:
       the data from KRPC always use `MemoryRecords` so it should never fail if 
the data is from KRPC. I will add more comments for this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to