mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r490941989



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -2496,46 +2501,81 @@ public void 
testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
             client.updateMetadata(initialUpdateResponse);
 
             final long fetchTimestamp = 10L;
-            Map<TopicPartition, ListOffsetResponse.PartitionData> 
allPartitionData = new HashMap<>();
-            allPartitionData.put(tp0, new ListOffsetResponse.PartitionData(
-                Errors.NONE, fetchTimestamp, 4L, Optional.empty()));
-            allPartitionData.put(tp1, new ListOffsetResponse.PartitionData(
-                retriableError, ListOffsetRequest.LATEST_TIMESTAMP, -1L, 
Optional.empty()));
+            List<ListOffsetTopicResponse> topics = Collections.singletonList(
+                    new ListOffsetTopicResponse()
+                        .setName(tp0.topic())
+                        .setPartitions(Arrays.asList(
+                                new ListOffsetPartitionResponse()
+                                    .setPartitionIndex(tp0.partition())
+                                    .setErrorCode(Errors.NONE.code())
+                                    .setTimestamp(fetchTimestamp)
+                                    .setOffset(4L),
+                                new ListOffsetPartitionResponse()
+                                    .setPartitionIndex(tp1.partition())
+                                    .setErrorCode(retriableError.code())
+                                    
.setTimestamp(ListOffsetRequest.LATEST_TIMESTAMP)
+                                    .setOffset(-1L))));
+            ListOffsetResponseData data = new ListOffsetResponseData()
+                    .setThrottleTimeMs(0)
+                    .setTopics(topics);
 
             client.prepareResponseFrom(body -> {
                 boolean isListOffsetRequest = body instanceof 
ListOffsetRequest;
                 if (isListOffsetRequest) {
                     ListOffsetRequest request = (ListOffsetRequest) body;
-                    Map<TopicPartition, ListOffsetRequest.PartitionData> 
expectedTopicPartitions = new HashMap<>();
-                    expectedTopicPartitions.put(tp0, new 
ListOffsetRequest.PartitionData(
-                        fetchTimestamp, Optional.empty()));
-                    expectedTopicPartitions.put(tp1, new 
ListOffsetRequest.PartitionData(
-                        fetchTimestamp, Optional.empty()));
-
-                    return 
request.partitionTimestamps().equals(expectedTopicPartitions);
+                    List<ListOffsetTopic> expectedTopics = 
Collections.singletonList(
+                            new ListOffsetTopic()
+                                .setName(tp0.topic())
+                                .setPartitions(Arrays.asList(
+                                        new ListOffsetPartition()
+                                            .setPartitionIndex(tp1.partition())
+                                            .setTimestamp(fetchTimestamp)
+                                            
.setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH),
+                                            new ListOffsetPartition()
+                                            .setPartitionIndex(tp0.partition())
+                                            .setTimestamp(fetchTimestamp)
+                                            
.setCurrentLeaderEpoch(ListOffsetResponse.UNKNOWN_EPOCH))));
+                    return request.topics().equals(expectedTopics);
                 } else {
                     return false;
                 }
-            }, new ListOffsetResponse(allPartitionData), originalLeader);
+            }, new ListOffsetResponse(data), originalLeader);
 
             client.prepareMetadataUpdate(updatedMetadata);
 
             // If the metadata wasn't updated before retrying, the fetcher 
would consult the original leader and hit a NOT_LEADER exception.
             // We will count the answered future response in the end to verify 
if this is the case.
-            Map<TopicPartition, ListOffsetResponse.PartitionData> 
paritionDataWithFatalError = new HashMap<>(allPartitionData);
-            paritionDataWithFatalError.put(tp1, new 
ListOffsetResponse.PartitionData(
-                Errors.NOT_LEADER_OR_FOLLOWER, 
ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
-            client.prepareResponseFrom(new 
ListOffsetResponse(paritionDataWithFatalError), originalLeader);
+            List<ListOffsetTopicResponse> topicsWithFatalError = 
Collections.singletonList(
+                    new ListOffsetTopicResponse()
+                        .setName(tp0.topic())
+                        .setPartitions(Arrays.asList(
+                                new ListOffsetPartitionResponse()

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to