dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r651831839



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4314,18 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+                        // check if there are any non MAX_TIMESTAMPS 
partitions left to be downgraded
+                        return partitionsToQuery.stream().anyMatch(
+                            t -> t.partitions().stream().anyMatch(
+                                p -> p.timestamp() != 
ListOffsetsRequest.MAX_TIMESTAMP));
+                    }

Review comment:
       Would it make sense to directly fail the future of the "max timestamp" 
requests here and to directly `partitionsToQuery` to only contains the 
remaining types? That would consolidate all the fall back logic here which is 
simpler to follow. Then, we could add another boolean `requireMaxTimestamp` to 
`ListOffsetsRequest.Builder.forConsumer` and we could directly pass 
`supportsMaxTimestamp` to it. This way, `createRequest` and `handleResponse` 
would remain unchanged. Did you already consider this?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsTopicResponse topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 
345L, 543);
+            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                result.partitionResult(tp0).get();
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);
+
+            ListOffsetsResultInfo tp1Offset = 
result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampAndNoBrokerResponse() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(new ArrayList<>());
+            env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);
+
+            Exception maxTimestampException = 
assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp0).get();
+            });
+            assertTrue(maxTimestampException.getCause() instanceof 
UnsupportedVersionException);
+
+            Exception nopResponseException = 
assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp1).get();
+            });
+            assertTrue(nopResponseException.getCause() instanceof 
ApiException);
+        }
+    }

Review comment:
       Could we also add a unit test for the happy path with max timestamp?

##########
File path: 
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName, 1, 1.asInstanceOf[Short])

Review comment:
       nit: You can use `.toShort`.

##########
File path: 
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName,1,1.asInstanceOf[Short])
+    produceMessages()
+    adminClient = Admin.create(Map[String, Object](
+      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+    ).asJava)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+    super.tearDown()
+  }
+
+  @Test
+  def testEarliestOffset(): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+    assertEquals(0,earliestOffset.offset())
+  }
+
+  @Test
+  def testLatestOffset(): Unit = {
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+    assertEquals(3,latestOffset.offset())
+  }
+
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+    val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp())
+    assertEquals(1,maxTimestampOffset.offset())
+  }
+
+  private def runFetchOffsets(adminClient: Admin,
+                              offsetSpec: OffsetSpec): 
ListOffsetsResult.ListOffsetsResultInfo = {
+    println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new 
ListOffsetsOptions())")

Review comment:
       Yeah, let's remove it.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});

Review comment:
       nit: Could we indent the block such that `}});` is aligned with 
`ListOffsetsResult`? Same for other tests.

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), 
consumerOffsets)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get

Review comment:
       This block is common in all the test cases that have been added. Could 
we extract it into a helper method?

##########
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##########
@@ -162,11 +162,15 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
-    TestUtils.generateAndProduceMessages(servers, topic, 10)
+    // produce in 2 batches to ensure the max timestamp matches the last 
message
+    TestUtils.generateAndProduceMessages(servers, topic, 9)
+    Thread.sleep(10)
+    TestUtils.generateAndProduceMessages(servers, topic, 1)

Review comment:
       It this really necessary? If yes, could we remove the `sleep`?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() 
throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new 
Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new 
Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new 
HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsTopicResponse topicResponse = 
ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 
345L, 543);
+            ListOffsetsResponseData responseData = new 
ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new 
ListOffsetsResponse(responseData), node);

Review comment:
       nit: I would move this block up, before calling `listOffsets`, in order 
to have the response ready. Same for other tests.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new 
Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, 
AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, 
Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = 
env.adminClient().listOffsets(Collections.singletonMap(tp0, 
OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> 
{
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = 
result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof 
UnsupportedVersionException);

Review comment:
       nit: We could use `TestUtils.assertFutureThrows` here.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
         val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
epochOptional))
+      } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+        // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
+        // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
+        val segmentsCopy = logSegments.toBuffer
+        val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+        val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+          latestTimestampSegment.offsetOfMaxTimestampSoFar,
+          epochOptional))

Review comment:
       I think that we should address this as well. I am fine with doing it in 
a follow-up PR though so we can keep this focused. Ok for you?

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), 
consumerOffsets)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get
+
+    for (timestamp <- 0 until 20)
+      log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val firstOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(19L, firstOffset.get.offset)
+
+    log.truncateTo(0)
+
+    val secondOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, secondOffset.get.offset)
+
+  }
+
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit 
= {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get
+
+    for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L))
+      log.appendAsLeader(TestUtils.singletonRecords(value = 
Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = 
log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(7L, log.logEndOffset)
+    assertEquals(5L, maxTimestampOffset.get.offset)

Review comment:
       Should we check the timestamp as well here and in the other tests as 
well?

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -2072,6 +2072,30 @@ class LogTest {
       log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
   }
 
+  @Test
+  def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+    val firstTimestamp = mockTime.milliseconds
+    val leaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1

Review comment:
       Should we produce a third record with a timestamp lower than this one to 
ensure that the API returns the maximum one and not the latest?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to