jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r572377972



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -133,6 +200,71 @@ public String toString() {
         return Collections.unmodifiableMap(result);
     }
 
+    private Map<TopicPartition, PartitionData> 
toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics, Map<Uuid, 
String> topicNames) {
+        Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> {
+            String name = topicNames.get(fetchTopic.topicId());
+            if (name != null) {
+                fetchTopic.partitions().forEach(fetchPartition ->
+                    result.put(new TopicPartition(name, 
fetchPartition.partition()),
+                        new PartitionData(
+                                fetchPartition.fetchOffset(),
+                                fetchPartition.logStartOffset(),
+                                fetchPartition.partitionMaxBytes(),
+                                
optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                                
optionalEpoch(fetchPartition.lastFetchedEpoch())
+                        )
+                    )
+                );
+            }
+        });
+        return Collections.unmodifiableMap(result);
+    }
+
+    // Only used when Fetch is version 13 or greater.
+    private FetchDataAndError 
toPartitionDataMapAndError(List<FetchRequestData.FetchTopic> fetchableTopics, 
Map<Uuid, String> topicNames) {
+        Map<TopicPartition, PartitionData> fetchData = new LinkedHashMap<>();
+        List<UnresolvedPartitions> unresolvedPartitions = new LinkedList<>();
+        Map<Uuid, FetchResponse.IdError> idErrors = new HashMap<>();
+        Errors error;
+        if (topicNames.isEmpty()) {
+            error = Errors.UNSUPPORTED_VERSION;
+        } else {
+            error = Errors.UNKNOWN_TOPIC_ID;
+        }
+        fetchableTopics.forEach(fetchTopic -> {
+            String name = topicNames.get(fetchTopic.topicId());
+            if (name != null) {
+                fetchTopic.partitions().forEach(fetchPartition ->
+                        fetchData.put(new TopicPartition(name, 
fetchPartition.partition()),
+                                new PartitionData(
+                                        fetchPartition.fetchOffset(),
+                                        fetchPartition.logStartOffset(),
+                                        fetchPartition.partitionMaxBytes(),
+                                        
optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                                        
optionalEpoch(fetchPartition.lastFetchedEpoch())
+                                )
+                        )
+                );
+            } else {
+                unresolvedPartitions.add(new 
UnresolvedPartitions(fetchTopic.topicId(), 
fetchTopic.partitions().stream().collect(Collectors.toMap(
+                        FetchRequestData.FetchPartition::partition, 
fetchPartition -> new PartitionData(
+                        fetchPartition.fetchOffset(),
+                        fetchPartition.logStartOffset(),
+                        fetchPartition.partitionMaxBytes(),
+                        optionalEpoch(fetchPartition.currentLeaderEpoch()),
+                        optionalEpoch(fetchPartition.lastFetchedEpoch()))))));
+
+                if (idErrors.containsKey(fetchTopic.topicId()))
+                    
idErrors.get(fetchTopic.topicId()).addPartitions(fetchTopic.partitions().stream().map(part
 -> part.partition()).collect(Collectors.toList()));

Review comment:
       I realize this is a bit confusing. addPartitions method takes a list
   What this line is doing is grabbing the idError object and adding partitions 
to it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to