dajac commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r454845616



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3888,25 +3892,40 @@ void handleResponse(AbstractResponse abstractResponse) {
                     ListOffsetResponse response = (ListOffsetResponse) 
abstractResponse;
                     Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets 
= new HashMap<>();
 
-                    for (Entry<TopicPartition, PartitionData> result : 
response.responseData().entrySet()) {
-                        TopicPartition tp = result.getKey();
-                        PartitionData partitionData = result.getValue();
-
-                        KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
-                        Errors error = partitionData.error;
-                        OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
-                        if (offsetRequestSpec == null) {
-                            future.completeExceptionally(new 
KafkaException("Unexpected topic partition " + tp + " in broker response!"));
-                        } else if 
(MetadataOperationContext.shouldRefreshMetadata(error)) {
-                            retryTopicPartitionOffsets.put(tp, 
offsetRequestSpec);
-                        } else if (error == Errors.NONE) {
-                            future.complete(new 
ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, 
partitionData.leaderEpoch));
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (ListOffsetTopicResponse topic : response.topics()) {
+                        for (ListOffsetPartitionResponse partition : 
topic.partitions()) {
+                            TopicPartition tp = new 
TopicPartition(topic.name(), partition.partitionIndex());
+                            KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
+                            Errors error = 
Errors.forCode(partition.errorCode());
+                            OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
+                            if (offsetRequestSpec == null) {
+                                log.warn("Server response mentioned unknown 
topic partition {}", tp);
+                            } else if 
(MetadataOperationContext.shouldRefreshMetadata(error)) {
+                                retryTopicPartitionOffsets.put(tp, 
offsetRequestSpec);
+                            } else if (error == Errors.NONE) {
+                                Optional<Integer> leaderEpoch = 
(partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH)
+                                        ? Optional.empty() 
+                                        : Optional.of(partition.leaderEpoch());
+                                future.complete(new 
ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch));
+                            } else {
+                                
future.completeExceptionally(error.exception());
+                            }
                         }
                     }
 
-                    if (!retryTopicPartitionOffsets.isEmpty()) {
+                    if (retryTopicPartitionOffsets.isEmpty()) {
+                        // The server should send back a response for every 
topic partition. But do a sanity check anyway.
+                        Set<TopicPartition> tpsOnBroker = new HashSet<>();
+                        for (ListOffsetTopic topic : partitionsToQuery) {
+                            for (ListOffsetPartition partition : 
topic.partitions()) {
+                                tpsOnBroker.add(new 
TopicPartition(topic.name(), partition.partitionIndex()));
+                            }
+                        }
+                        completeUnrealizedFutures(
+                            futures.entrySet().stream().filter(entry -> 
tpsOnBroker.contains(entry.getKey())),
+                            tp -> "The response from broker " + brokerId +
+                                " did not contain a result for topic partition 
" + tp);

Review comment:
       As we don't have the list of TopicPartition available to filter the list 
of futures, we could actually directly complete the future within the loop 
instead of populating the HashSet. It avoids building the HashSet and having to 
traverse the futures.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3870,12 +3873,13 @@ public ListOffsetsResult 
listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
             }
         }
 
-        for (final Map.Entry<Node, Map<TopicPartition, 
ListOffsetRequest.PartitionData>> entry: leaders.entrySet()) {
+        for (final Map.Entry<Node, Map<String, ListOffsetTopic>> entry: 
leaders.entrySet()) {

Review comment:
       nit: We usually put a space before and after `:`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, 
RequestFuture<ListOffsetResult> f
      *               value of each partition may be null only for v0. In v1 
and later the ListOffset API would not
      *               return a null timestamp (-1 is returned instead when 
necessary).
      */
-    private void handleListOffsetResponse(Map<TopicPartition, 
ListOffsetRequest.PartitionData> timestampsToSearch,
+    private void handleListOffsetResponse(Map<TopicPartition, 
ListOffsetPartition> timestampsToSearch,
                                           ListOffsetResponse 
listOffsetResponse,
                                           RequestFuture<ListOffsetResult> 
future) {
         Map<TopicPartition, ListOffsetData> fetchedOffsets = new HashMap<>();
         Set<TopicPartition> partitionsToRetry = new HashSet<>();
         Set<String> unauthorizedTopics = new HashSet<>();
 
-        for (Map.Entry<TopicPartition, ListOffsetRequest.PartitionData> entry 
: timestampsToSearch.entrySet()) {
+        Map<TopicPartition, ListOffsetPartitionResponse> partitionsData = 
byTopicPartitions(listOffsetResponse.responseData());

Review comment:
       I agree that we should at minimum avoid hitting a NPE.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -965,11 +994,11 @@ public void onFailure(RuntimeException e) {
      * @return A response which can be polled to obtain the corresponding 
timestamps and offsets.
      */
     private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node 
node,
-                                                                  final 
Map<TopicPartition, ListOffsetRequest.PartitionData> timestampsToSearch,
+                                                                  final 
Map<TopicPartition, ListOffsetPartition> timestampsToSearch,
                                                                   boolean 
requireTimestamp) {
         ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
                 .forConsumer(requireTimestamp, isolationLevel)
-                .setTargetTimes(timestampsToSearch);
+                .setTargetTimes(toListOffsetTopics(timestampsToSearch));

Review comment:
       I had a look at this and your are right. It seems that keeping 
`TopicPartition` is better and difficult to change. In this case, have you 
considered pushing the conversion to the `Builder` by providing an overload of 
`setTargetTimes` which accepts a `Map<TopicPartition, ListOffsetPartition>`? 
That could make the code in the `Fetcher` a bit cleaner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to