jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r572377972
########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ########## @@ -133,6 +200,71 @@ public String toString() { return Collections.unmodifiableMap(result); } + private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics, Map<Uuid, String> topicNames) { + Map<TopicPartition, PartitionData> result = new LinkedHashMap<>(); + fetchableTopics.forEach(fetchTopic -> { + String name = topicNames.get(fetchTopic.topicId()); + if (name != null) { + fetchTopic.partitions().forEach(fetchPartition -> + result.put(new TopicPartition(name, fetchPartition.partition()), + new PartitionData( + fetchPartition.fetchOffset(), + fetchPartition.logStartOffset(), + fetchPartition.partitionMaxBytes(), + optionalEpoch(fetchPartition.currentLeaderEpoch()), + optionalEpoch(fetchPartition.lastFetchedEpoch()) + ) + ) + ); + } + }); + return Collections.unmodifiableMap(result); + } + + // Only used when Fetch is version 13 or greater. + private FetchDataAndError toPartitionDataMapAndError(List<FetchRequestData.FetchTopic> fetchableTopics, Map<Uuid, String> topicNames) { + Map<TopicPartition, PartitionData> fetchData = new LinkedHashMap<>(); + List<UnresolvedPartitions> unresolvedPartitions = new LinkedList<>(); + Map<Uuid, FetchResponse.IdError> idErrors = new HashMap<>(); + Errors error; + if (topicNames.isEmpty()) { + error = Errors.UNSUPPORTED_VERSION; + } else { + error = Errors.UNKNOWN_TOPIC_ID; + } + fetchableTopics.forEach(fetchTopic -> { + String name = topicNames.get(fetchTopic.topicId()); + if (name != null) { + fetchTopic.partitions().forEach(fetchPartition -> + fetchData.put(new TopicPartition(name, fetchPartition.partition()), + new PartitionData( + fetchPartition.fetchOffset(), + fetchPartition.logStartOffset(), + fetchPartition.partitionMaxBytes(), + optionalEpoch(fetchPartition.currentLeaderEpoch()), + optionalEpoch(fetchPartition.lastFetchedEpoch()) + ) + ) + ); + } else { + unresolvedPartitions.add(new UnresolvedPartitions(fetchTopic.topicId(), fetchTopic.partitions().stream().collect(Collectors.toMap( + FetchRequestData.FetchPartition::partition, fetchPartition -> new PartitionData( + fetchPartition.fetchOffset(), + fetchPartition.logStartOffset(), + fetchPartition.partitionMaxBytes(), + optionalEpoch(fetchPartition.currentLeaderEpoch()), + optionalEpoch(fetchPartition.lastFetchedEpoch())))))); + + if (idErrors.containsKey(fetchTopic.topicId())) + idErrors.get(fetchTopic.topicId()).addPartitions(fetchTopic.partitions().stream().map(part -> part.partition()).collect(Collectors.toList())); Review comment: I realize this is a bit confusing. addPartitions method takes a list What this line is doing is grabbing the idError object and adding partitions to it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org