abbccdda commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r468735528



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4068,6 +4093,58 @@ public void testListOffsetsMetadataNonRetriableErrors() 
throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsPartialResponse() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        List<Node> nodes = Arrays.asList(node0, node1);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0, 
node1}, new Node[]{node0, node1}));
+        pInfos.add(new PartitionInfo("foo", 1, node0, new Node[]{node0, 
node1}, new Node[]{node0, node1}));
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes,
+                pInfos,
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(),
+                node0);
+
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            ListOffsetTopicResponse t0 = prepareListOffsetTopicResponse(tp0, 
Errors.NONE, -2L, 123L, 456);
+            ListOffsetResponseData data = new ListOffsetResponseData()
+                    .setThrottleTimeMs(0)
+                    .setTopics(Arrays.asList(t0));
+            env.kafkaClient().prepareResponseFrom(new 
ListOffsetResponse(data), node0);
+
+            Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
+            partitions.put(tp0, OffsetSpec.latest());
+            partitions.put(tp1, OffsetSpec.latest());
+            ListOffsetsResult result = 
env.adminClient().listOffsets(partitions);
+            assertNotNull(result.partitionResult(tp0).get());
+            TestUtils.assertFutureThrows(result.partitionResult(tp1), 
ApiException.class);
+            TestUtils.assertFutureThrows(result.all(), ApiException.class);
+        }
+    }
+
+    private static ListOffsetTopicResponse 
prepareListOffsetTopicResponse(TopicPartition tp, Errors error, long timestamp, 
long offset, int epoch) {

Review comment:
       This could move to `ListOffsetResponse` as a helper, and maybe name as 
`singletonListOffsetTopicResponse`

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3973,25 +3977,38 @@ void handleResponse(AbstractResponse abstractResponse) {
                     ListOffsetResponse response = (ListOffsetResponse) 
abstractResponse;
                     Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets 
= new HashMap<>();
 
-                    for (Entry<TopicPartition, PartitionData> result : 
response.responseData().entrySet()) {
-                        TopicPartition tp = result.getKey();
-                        PartitionData partitionData = result.getValue();
-
-                        KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
-                        Errors error = partitionData.error;
-                        OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
-                        if (offsetRequestSpec == null) {
-                            future.completeExceptionally(new 
KafkaException("Unexpected topic partition " + tp + " in broker response!"));
-                        } else if 
(MetadataOperationContext.shouldRefreshMetadata(error)) {
-                            retryTopicPartitionOffsets.put(tp, 
offsetRequestSpec);
-                        } else if (error == Errors.NONE) {
-                            future.complete(new 
ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, 
partitionData.leaderEpoch));
-                        } else {
-                            future.completeExceptionally(error.exception());
+                    for (ListOffsetTopicResponse topic : response.topics()) {
+                        for (ListOffsetPartitionResponse partition : 
topic.partitions()) {
+                            TopicPartition tp = new 
TopicPartition(topic.name(), partition.partitionIndex());
+                            KafkaFutureImpl<ListOffsetsResultInfo> future = 
futures.get(tp);
+                            Errors error = 
Errors.forCode(partition.errorCode());
+                            OffsetSpec offsetRequestSpec = 
topicPartitionOffsets.get(tp);
+                            if (offsetRequestSpec == null) {

Review comment:
       Why do we convert the fatal scenario towards a warning?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##########
@@ -307,32 +169,16 @@ public static ListOffsetRequest parse(ByteBuffer buffer, 
short version) {
 
     @Override
     protected Struct toStruct() {
-        short version = version();
-        Struct struct = new 
Struct(ApiKeys.LIST_OFFSETS.requestSchema(version));
-        Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupPartitionDataByTopic(partitionTimestamps);
-
-        struct.set(REPLICA_ID, replicaId);
-        struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id());
+        return data.toStruct(version());
+    }
 
-        List<Struct> topicArray = new ArrayList<>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: 
topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS);
-            topicData.set(TOPIC_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.getValue().entrySet()) {
-                PartitionData offsetPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS);
-                partitionData.set(PARTITION_ID, partitionEntry.getKey());
-                partitionData.set(TIMESTAMP, offsetPartitionData.timestamp);
-                partitionData.setIfExists(MAX_NUM_OFFSETS, 
offsetPartitionData.maxNumOffsets);
-                RequestUtils.setLeaderEpochIfExists(partitionData, 
CURRENT_LEADER_EPOCH,
-                        offsetPartitionData.currentLeaderEpoch);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS, partitionArray.toArray());
-            topicArray.add(topicData);
+    public static List<ListOffsetTopic> toListOffsetTopics(Map<TopicPartition, 
ListOffsetPartition> timestampsToSearch) {

Review comment:
       Could we add a unit test for this function?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -2149,20 +2163,29 @@ private ListOffsetResponse 
listOffsetsResponse(Map<TopicPartition, Long> offsets
 
     private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> 
partitionOffsets,
                                                    Map<TopicPartition, Errors> 
partitionErrors) {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = 
new HashMap<>();
+        Map<String, ListOffsetTopicResponse> responses = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> partitionOffset : 
partitionOffsets.entrySet()) {
-            partitionData.put(partitionOffset.getKey(), new 
ListOffsetResponse.PartitionData(Errors.NONE,
-                    ListOffsetResponse.UNKNOWN_TIMESTAMP, 
partitionOffset.getValue(),
-                    Optional.empty()));
+            TopicPartition tp = partitionOffset.getKey();
+            ListOffsetTopicResponse topic = 
responses.computeIfAbsent(tp.topic(), k -> new 
ListOffsetTopicResponse().setName(tp.topic()));
+            topic.partitions().add(new ListOffsetPartitionResponse()
+                    .setPartitionIndex(tp.partition())
+                    .setErrorCode(Errors.NONE.code())
+                    .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+                    .setOffset(partitionOffset.getValue()));
         }
 
         for (Map.Entry<TopicPartition, Errors> partitionError : 
partitionErrors.entrySet()) {
-            partitionData.put(partitionError.getKey(), new 
ListOffsetResponse.PartitionData(
-                    partitionError.getValue(), 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                    ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty()));
+            TopicPartition tp = partitionError.getKey();
+            ListOffsetTopicResponse topic = 
responses.computeIfAbsent(tp.topic(), k -> new 
ListOffsetTopicResponse().setName(tp.topic()));
+            topic.partitions().add(new ListOffsetPartitionResponse()
+                    .setPartitionIndex(tp.partition())
+                    .setErrorCode(partitionError.getValue().code())
+                    .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+                    .setOffset(ListOffsetResponse.UNKNOWN_OFFSET));
         }
-
-        return new ListOffsetResponse(partitionData);
+        ListOffsetResponseData data = new ListOffsetResponseData()
+                .setTopics(new 
ArrayList<ListOffsetTopicResponse>(responses.values()));

Review comment:
       `new ArrayList<>` is suffice

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3546,12 +3594,54 @@ private void testGetOffsetsForTimesWithUnknownOffset() {
         MetadataResponse initialMetadataUpdate = 
TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
         client.updateMetadata(initialMetadataUpdate);
 
-        Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = 
new HashMap<>();
-        partitionData.put(tp0, new 
ListOffsetResponse.PartitionData(Errors.NONE,
-                ListOffsetResponse.UNKNOWN_TIMESTAMP, 
ListOffsetResponse.UNKNOWN_OFFSET,
-                Optional.empty()));
+        ListOffsetResponseData data = new ListOffsetResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Collections.singletonList(new 
ListOffsetTopicResponse()
+                        .setName(tp0.topic())
+                        .setPartitions(Collections.singletonList(new 
ListOffsetPartitionResponse()
+                                .setPartitionIndex(tp0.partition())
+                                .setErrorCode(Errors.NONE.code())
+                                
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+                                
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)))));
+
+        client.prepareResponseFrom(new ListOffsetResponse(data),
+                metadata.fetch().leaderFor(tp0));
+
+        Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
+        timestampToSearch.put(tp0, 0L);
+        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+                fetcher.offsetsForTimes(timestampToSearch, 
time.timer(Long.MAX_VALUE));
 
-        client.prepareResponseFrom(new ListOffsetResponse(0, partitionData),
+        assertTrue(offsetAndTimestampMap.containsKey(tp0));
+        assertNull(offsetAndTimestampMap.get(tp0));
+    }
+
+    @Test
+    public void testGetOffsetsForTimesWithUnknownOffsetV0() {

Review comment:
       Is this expected before the change in this PR?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -910,136 +913,162 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderOrFollowerException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context,
+        DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava)
+    )
 
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
 
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderOrFollowerException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    def buildErrorResponse(e: Errors, partition: ListOffsetPartition): 
ListOffsetPartitionResponse = {
+      new ListOffsetPartitionResponse()
+        .setPartitionIndex(partition.partitionIndex)
+        .setErrorCode(e.code)
+        .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+        .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+    }
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context,
+        DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          buildErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, 
partition)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          buildErrorResponse(Errors.INVALID_REQUEST, partition)
+        } else {
+

Review comment:
       nit: line seems not necessary.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3732,41 +3734,43 @@ public void testListOffsets() throws Exception {
                 Collections.<String>emptySet(),
                 node0);
 
-        final TopicPartition tp1 = new TopicPartition("foo", 0);
-        final TopicPartition tp2 = new TopicPartition("bar", 0);
-        final TopicPartition tp3 = new TopicPartition("baz", 0);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);

Review comment:
       I think we could reduce the change of this PR by reverting the numbering 
change which seems unnecessary.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##########
@@ -584,14 +590,22 @@ public void 
testFetchProgressWithMissingPartitionPosition() {
         consumer.seekToEnd(singleton(tp0));
         consumer.seekToBeginning(singleton(tp1));
 
-        client.prepareResponse(
-            body -> {
-                ListOffsetRequest request = (ListOffsetRequest) body;
-                Map<TopicPartition, ListOffsetRequest.PartitionData> 
timestamps = request.partitionTimestamps();
-                return timestamps.get(tp0).timestamp == 
ListOffsetRequest.LATEST_TIMESTAMP &&
-                        timestamps.get(tp1).timestamp == 
ListOffsetRequest.EARLIEST_TIMESTAMP;
-            }, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
-                        Collections.singletonMap(tp1, 
Errors.NOT_LEADER_OR_FOLLOWER)));
+        client.prepareResponse(body -> {
+            ListOffsetRequest request = (ListOffsetRequest) body;
+            List<ListOffsetPartition> partitions = 
request.topics().stream().flatMap(topic -> {
+                if (topic.name().equals(tp0.topic()))

Review comment:
       This looks weird, as we are only filtering tp0 partitions, why we could 
check tp0 and tp1 later?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1221,28 +1226,39 @@ private DeleteGroupsResponse 
createDeleteGroupsResponse() {
 
     private ListOffsetRequest createListOffsetRequest(int version) {
         if (version == 0) {
-            Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = 
Collections.singletonMap(
-                    new TopicPartition("test", 0),
-                    new ListOffsetRequest.PartitionData(1000000L, 10));
+            ListOffsetTopic topic = new ListOffsetTopic()

Review comment:
       Well, the purpose is more about encapsulation to reduce the import paths 
in this test class.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3546,12 +3594,54 @@ private void testGetOffsetsForTimesWithUnknownOffset() {
         MetadataResponse initialMetadataUpdate = 
TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
         client.updateMetadata(initialMetadataUpdate);
 
-        Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = 
new HashMap<>();
-        partitionData.put(tp0, new 
ListOffsetResponse.PartitionData(Errors.NONE,
-                ListOffsetResponse.UNKNOWN_TIMESTAMP, 
ListOffsetResponse.UNKNOWN_OFFSET,
-                Optional.empty()));
+        ListOffsetResponseData data = new ListOffsetResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Collections.singletonList(new 
ListOffsetTopicResponse()
+                        .setName(tp0.topic())
+                        .setPartitions(Collections.singletonList(new 
ListOffsetPartitionResponse()
+                                .setPartitionIndex(tp0.partition())
+                                .setErrorCode(Errors.NONE.code())
+                                
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+                                
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)))));
+
+        client.prepareResponseFrom(new ListOffsetResponse(data),
+                metadata.fetch().leaderFor(tp0));
+
+        Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
+        timestampToSearch.put(tp0, 0L);
+        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+                fetcher.offsetsForTimes(timestampToSearch, 
time.timer(Long.MAX_VALUE));
 
-        client.prepareResponseFrom(new ListOffsetResponse(0, partitionData),
+        assertTrue(offsetAndTimestampMap.containsKey(tp0));
+        assertNull(offsetAndTimestampMap.get(tp0));
+    }
+
+    @Test
+    public void testGetOffsetsForTimesWithUnknownOffsetV0() {
+        buildFetcher();
+        // Empty map
+        assertTrue(fetcher.offsetsForTimes(new HashMap<>(), 
time.timer(100L)).isEmpty());
+        // Unknown Offset
+        client.reset();
+        // Ensure metadata has both partition.

Review comment:
       both partitions

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.INVALID_REQUEST.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+        } else {
+  
+          def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = {
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(e.code)
+              .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+              .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+          }
+  
+          try {
+            val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
+            val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
+            val isolationLevelOpt = if (isClientRequest)
+              Some(offsetRequest.isolationLevel)
+            else
+              None
+  
+            val foundOpt = 
replicaManager.fetchOffsetForTimestamp(topicPartition,
+              partition.timestamp,
+              isolationLevelOpt,
+              if (partition.currentLeaderEpoch == 
ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else 
Optional.of(partition.currentLeaderEpoch),

Review comment:
       @mimaison WDYT? I'm neutral.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ListOffsetRequestTest.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.ListOffsetRequestData;
+import 
org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
+import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
+import org.apache.kafka.common.message.ListOffsetResponseData;
+import 
org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+public class ListOffsetRequestTest {

Review comment:
       Probably ok to skip.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1251,15 +1267,28 @@ private ListOffsetRequest createListOffsetRequest(int 
version) {
 
     private ListOffsetResponse createListOffsetResponse(int version) {
         if (version == 0) {
-            Map<TopicPartition, ListOffsetResponse.PartitionData> responseData 
= new HashMap<>();
-            responseData.put(new TopicPartition("test", 0),
-                    new ListOffsetResponse.PartitionData(Errors.NONE, 
asList(100L)));
-            return new ListOffsetResponse(responseData);
+            ListOffsetResponseData data = new ListOffsetResponseData()

Review comment:
       As my previous comment suggests, for the sake of encapsulation and 
reusability.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -910,136 +913,162 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderOrFollowerException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context,
+        DESCRIBE, TOPIC, offsetRequest.topics.asScala.toSeq)(_.name)
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)).asJava)
+    )
 
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
 
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cased since these error messages

Review comment:
       special cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to