[ 
https://issues.apache.org/jira/browse/KAFKA-7568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672252#comment-16672252
 ] 
ASF GitHub Bot commented on KAFKA-7568:
---------------------------------------

hachikuji closed pull request #5855: KAFKA-7568; Return leader epoch in 
ListOffsets response
URL: https://github.com/apache/kafka/pull/5855
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 3537fc34bb0..d723ba091b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -32,6 +32,8 @@
 import java.nio.channels.GatheringByteChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -320,7 +322,8 @@ public TimestampAndOffset searchForTimestamp(long 
targetTimestamp, int startingP
                 for (Record record : batch) {
                     long timestamp = record.timestamp();
                     if (timestamp >= targetTimestamp && record.offset() >= 
startingOffset)
-                        return new TimestampAndOffset(timestamp, 
record.offset());
+                        return new TimestampAndOffset(timestamp, 
record.offset(),
+                                
maybeLeaderEpoch(batch.partitionLeaderEpoch()));
                 }
             }
         }
@@ -335,15 +338,23 @@ public TimestampAndOffset searchForTimestamp(long 
targetTimestamp, int startingP
     public TimestampAndOffset largestTimestampAfter(int startingPosition) {
         long maxTimestamp = RecordBatch.NO_TIMESTAMP;
         long offsetOfMaxTimestamp = -1L;
+        int leaderEpochOfMaxTimestamp = RecordBatch.NO_PARTITION_LEADER_EPOCH;
 
         for (RecordBatch batch : batchesFrom(startingPosition)) {
             long timestamp = batch.maxTimestamp();
             if (timestamp > maxTimestamp) {
                 maxTimestamp = timestamp;
                 offsetOfMaxTimestamp = batch.lastOffset();
+                leaderEpochOfMaxTimestamp = batch.partitionLeaderEpoch();
             }
         }
-        return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp);
+        return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp,
+                maybeLeaderEpoch(leaderEpochOfMaxTimestamp));
+    }
+
+    private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
+        return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
+                Optional.empty() : Optional.of(leaderEpoch);
     }
 
     /**
@@ -492,28 +503,27 @@ public String toString() {
     public static class TimestampAndOffset {
         public final long timestamp;
         public final long offset;
+        public final Optional<Integer> leaderEpoch;
 
-        public TimestampAndOffset(long timestamp, long offset) {
+        public TimestampAndOffset(long timestamp, long offset, 
Optional<Integer> leaderEpoch) {
             this.timestamp = timestamp;
             this.offset = offset;
+            this.leaderEpoch = leaderEpoch;
         }
 
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
-
             TimestampAndOffset that = (TimestampAndOffset) o;
-
-            if (timestamp != that.timestamp) return false;
-            return offset == that.offset;
+            return timestamp == that.timestamp &&
+                    offset == that.offset &&
+                    Objects.equals(leaderEpoch, that.leaderEpoch);
         }
 
         @Override
         public int hashCode() {
-            int result = (int) (timestamp ^ (timestamp >>> 32));
-            result = 31 * result + (int) (offset ^ (offset >>> 32));
-            return result;
+            return Objects.hash(timestamp, offset, leaderEpoch);
         }
 
         @Override
@@ -521,6 +531,7 @@ public String toString() {
             return "TimestampAndOffset(" +
                     "timestamp=" + timestamp +
                     ", offset=" + offset +
+                    ", leaderEpoch=" + leaderEpoch +
                     ')';
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 637da9386e5..1945bccb613 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -34,6 +34,7 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.utf8;
@@ -41,6 +42,8 @@
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -318,12 +321,12 @@ public void testTruncateIfSizeIsDifferentToTargetSize() 
throws IOException {
     @Test
     public void testPreallocateTrue() throws IOException {
         File temp = tempFile();
-        FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 
1024, true);
+        FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, 
true);
         long position = fileRecords.channel().position();
         int size = fileRecords.sizeInBytes();
         assertEquals(0, position);
         assertEquals(0, size);
-        assertEquals(512 * 1024 * 1024, temp.length());
+        assertEquals(1024 * 1024, temp.length());
     }
 
     /**
@@ -332,7 +335,7 @@ public void testPreallocateTrue() throws IOException {
     @Test
     public void testPreallocateFalse() throws IOException {
         File temp = tempFile();
-        FileRecords set = FileRecords.open(temp, false, 512 * 1024 * 1024, 
false);
+        FileRecords set = FileRecords.open(temp, false, 1024 * 1024, false);
         long position = set.channel().position();
         int size = set.sizeInBytes();
         assertEquals(0, position);
@@ -346,7 +349,7 @@ public void testPreallocateFalse() throws IOException {
     @Test
     public void testPreallocateClearShutdown() throws IOException {
         File temp = tempFile();
-        FileRecords fileRecords = FileRecords.open(temp, false, 512 * 1024 * 
1024, true);
+        FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, 
true);
         append(fileRecords, values);
 
         int oldPosition = (int) fileRecords.channel().position();
@@ -356,7 +359,7 @@ public void testPreallocateClearShutdown() throws 
IOException {
         fileRecords.close();
 
         File tempReopen = new File(temp.getAbsolutePath());
-        FileRecords setReopen = FileRecords.open(tempReopen, true, 512 * 1024 
* 1024, true);
+        FileRecords setReopen = FileRecords.open(tempReopen, true, 1024 * 
1024, true);
         int position = (int) setReopen.channel().position();
         int size = setReopen.sizeInBytes();
 
@@ -382,6 +385,55 @@ public void testFormatConversionWithPartialMessage() 
throws IOException {
         assertTrue("No messages should be returned", !it.hasNext());
     }
 
+    @Test
+    public void testSearchForTimestamp() throws IOException {
+        for (RecordVersion version : RecordVersion.values()) {
+            testSearchForTimestamp(version);
+        }
+    }
+
+    private void testSearchForTimestamp(RecordVersion version) throws 
IOException {
+        File temp = tempFile();
+        FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, 
true);
+        appendWithOffsetAndTimestamp(fileRecords, version, 10L, 5, 0);
+        appendWithOffsetAndTimestamp(fileRecords, version, 11L, 6, 1);
+
+        assertFoundTimestamp(new FileRecords.TimestampAndOffset(10L, 5, 
Optional.of(0)),
+                fileRecords.searchForTimestamp(9L, 0, 0L), version);
+        assertFoundTimestamp(new FileRecords.TimestampAndOffset(10L, 5, 
Optional.of(0)),
+                fileRecords.searchForTimestamp(10L, 0, 0L), version);
+        assertFoundTimestamp(new FileRecords.TimestampAndOffset(11L, 6, 
Optional.of(1)),
+                fileRecords.searchForTimestamp(11L, 0, 0L), version);
+        assertNull(fileRecords.searchForTimestamp(12L, 0, 0L));
+    }
+
+    private void assertFoundTimestamp(FileRecords.TimestampAndOffset expected,
+                                      FileRecords.TimestampAndOffset actual,
+                                      RecordVersion version) {
+        if (version == RecordVersion.V0) {
+            assertNull("Expected no match for message format v0", actual);
+        } else {
+            assertNotNull("Expected to find timestamp for message format " + 
version, actual);
+            assertEquals("Expected matching timestamps for message format" + 
version, expected.timestamp, actual.timestamp);
+            assertEquals("Expected matching offsets for message format " + 
version, expected.offset, actual.offset);
+            Optional<Integer> expectedLeaderEpoch = version.value >= 
RecordVersion.V2.value ?
+                    expected.leaderEpoch : Optional.empty();
+            assertEquals("Non-matching leader epoch for version " + version, 
expectedLeaderEpoch, actual.leaderEpoch);
+        }
+    }
+
+    private void appendWithOffsetAndTimestamp(FileRecords fileRecords,
+                                              RecordVersion recordVersion,
+                                              long timestamp,
+                                              long offset,
+                                              int leaderEpoch) throws 
IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
recordVersion.value,
+                CompressionType.NONE, TimestampType.CREATE_TIME, offset, 
timestamp, leaderEpoch);
+        builder.append(new SimpleRecord(timestamp, new byte[0], new byte[0]));
+        fileRecords.append(builder.build());
+    }
+
     @Test
     public void testConversion() throws IOException {
         doTestConversion(CompressionType.NONE, RecordBatch.MAGIC_VALUE_V0);
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 024fdccbd75..819da2cfd42 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests._
@@ -803,7 +804,7 @@ class Partition(val topicPartition: TopicPartition,
   def fetchOffsetForTimestamp(timestamp: Long,
                               isolationLevel: Option[IsolationLevel],
                               currentLeaderEpoch: Optional[Integer],
-                              fetchOnlyFromLeader: Boolean): TimestampOffset = 
inReadLock(leaderIsrUpdateLock) {
+                              fetchOnlyFromLeader: Boolean): 
Option[TimestampAndOffset] = inReadLock(leaderIsrUpdateLock) {
     // decide whether to only fetch from leader
     val localReplica = localReplicaWithEpochOrException(currentLeaderEpoch, 
fetchOnlyFromLeader)
 
@@ -814,16 +815,16 @@ class Partition(val topicPartition: TopicPartition,
     }
 
     if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
-      TimestampOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset)
+      Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
lastFetchableOffset, Optional.of(leaderEpoch)))
     } else {
-      def allowed(timestampOffset: TimestampOffset): Boolean =
+      def allowed(timestampOffset: TimestampAndOffset): Boolean =
         timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP || 
timestampOffset.offset < lastFetchableOffset
 
       val fetchedOffset = logManager.getLog(topicPartition).flatMap { log =>
-        log.fetchOffsetsByTimestamp(timestamp)
+        log.fetchOffsetByTimestamp(timestamp)
       }
 
-      fetchedOffset.filter(allowed).getOrElse(TimestampOffset.Unknown)
+      fetchedOffset.filter(allowed)
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index bc328d77efc..19e2f2f5424 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -22,6 +22,7 @@ import java.lang.{Long => JLong}
 import java.nio.file.{Files, NoSuchFileException}
 import java.text.NumberFormat
 import java.util.Map.{Entry => JEntry}
+import java.util.Optional
 import java.util.concurrent.atomic._
 import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, 
TimeUnit}
 import java.util.regex.Pattern
@@ -36,6 +37,7 @@ import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, 
LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.ListOffsetRequest
@@ -1263,7 +1265,7 @@ class Log(@volatile var dir: File,
    * @return The offset of the first message whose timestamp is greater than 
or equals to the given timestamp.
    *         None if no such message is found.
    */
-  def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] 
= {
+  def fetchOffsetByTimestamp(targetTimestamp: Long): 
Option[TimestampAndOffset] = {
     maybeHandleIOException(s"Error while fetching offset by timestamp for 
$topicPartition in dir ${dir.getParent}") {
       debug(s"Searching offset for timestamp $targetTimestamp")
 
@@ -1278,10 +1280,24 @@ class Log(@volatile var dir: File,
       // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
       val segmentsCopy = logSegments.toBuffer
       // For the earliest and latest, we do not need to return the timestamp.
-      if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
-        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
-      else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
-        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
+      if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) {
+        // The first cached epoch usually corresponds to the log start offset, 
but we have to verify this since
+        // it may not be true following a message format version bump as the 
epoch will not be available for
+        // log entries written in the older format.
+        val earliestEpochEntry = leaderEpochCache.earliestEntry
+        val epochOpt = earliestEpochEntry match {
+          case Some(entry) if entry.startOffset <= logStartOffset => 
Optional.of[Integer](entry.epoch)
+          case _ => Optional.empty[Integer]()
+        }
+        return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
logStartOffset, epochOpt))
+      } else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) {
+        val latestEpoch = leaderEpochCache.latestEpoch
+        val epochOpt = if (latestEpoch == 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
+          Optional.empty[Integer]()
+        else
+          Optional.of[Integer](latestEpoch)
+        return Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
logEndOffset, epochOpt))
+      }
 
       val targetSeg = {
         // Get all the segments whose largest timestamp is smaller than target 
timestamp
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index d910a29100c..a700aeb78d4 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -27,7 +27,7 @@ import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.record.FileRecords.LogOffsetPosition
+import org.apache.kafka.common.record.FileRecords.{LogOffsetPosition, 
TimestampAndOffset}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 
@@ -546,15 +546,13 @@ class LogSegment private[log] (val log: FileRecords,
    * @param startingOffset The starting offset to search.
    * @return the timestamp and offset of the first message that meets the 
requirements. None will be returned if there is no such message.
    */
-  def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = 
baseOffset): Option[TimestampOffset] = {
+  def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = 
baseOffset): Option[TimestampAndOffset] = {
     // Get the index entry with a timestamp less than or equal to the target 
timestamp
     val timestampOffset = timeIndex.lookup(timestamp)
     val position = offsetIndex.lookup(math.max(timestampOffset.offset, 
startingOffset)).position
 
     // Search the timestamp
-    Option(log.searchForTimestamp(timestamp, position, startingOffset)).map { 
timestampAndOffset =>
-      TimestampOffset(timestampAndOffset.timestamp, timestampAndOffset.offset)
-    }
+    Option(log.searchForTimestamp(timestamp, position, startingOffset))
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index e3dc9217d45..be6736d9c07 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -817,14 +817,20 @@ class KafkaApis(val requestChannel: RequestChannel,
           else
             None
 
-          val found = replicaManager.fetchOffsetForTimestamp(topicPartition,
+          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
             partitionData.timestamp,
             isolationLevelOpt,
             partitionData.currentLeaderEpoch,
             fetchOnlyFromLeader)
 
-          (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset,
-            Optional.empty()))
+          val response = foundOpt match {
+            case Some(found) =>
+              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
+            case None =>
+              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
+                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
+          }
+          (topicPartition, response)
         } catch {
           // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
           // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e3feb7194c6..1146befdc8e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -45,6 +45,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -773,7 +774,7 @@ class ReplicaManager(val config: KafkaConfig,
                               timestamp: Long,
                               isolationLevel: Option[IsolationLevel],
                               currentLeaderEpoch: Optional[Integer],
-                              fetchOnlyFromLeader: Boolean): TimestampOffset = 
{
+                              fetchOnlyFromLeader: Boolean): 
Option[TimestampAndOffset] = {
     val partition = getPartitionOrException(topicPartition, expectLeader = 
fetchOnlyFromLeader)
     partition.fetchOffsetForTimestamp(timestamp, isolationLevel, 
currentLeaderEpoch, fetchOnlyFromLeader)
   }
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala 
b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index cee6bb66bdf..f47d3bd007a 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -85,17 +85,24 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
   }
 
   /**
-    * Returns the current Leader Epoch. This is the latest epoch
-    * which has messages assigned to it.
-    *
-    * @return
-    */
+   * Returns the current Leader Epoch. This is the latest epoch
+   * which has messages assigned to it.
+   */
   def latestEpoch: Int = {
     inReadLock(lock) {
       if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch
     }
   }
 
+  /**
+   * Get the earliest cached entry if one exists.
+   */
+  def earliestEntry: Option[EpochEntry] = {
+    inReadLock(lock) {
+      epochs.headOption
+    }
+  }
+
   /**
     * Returns the Leader Epoch and the End Offset for a requested Leader Epoch.
     *
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 0c637755d24..42b3984e305 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1214,16 +1214,27 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     //  topic3Partition0 -> 80,
     //  topic3Partition1 -> 100)
     val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch)
-    assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 
0)).offset())
-    assertEquals(0, timestampOffsets.get(new TopicPartition(topic1, 
0)).timestamp())
-    assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 
1)).offset())
-    assertEquals(20, timestampOffsets.get(new TopicPartition(topic1, 
1)).timestamp())
+
+    val timestampTopic1P0 = timestampOffsets.get(new TopicPartition(topic1, 0))
+    assertEquals(0, timestampTopic1P0.offset)
+    assertEquals(0, timestampTopic1P0.timestamp)
+    assertEquals(Optional.of(0), timestampTopic1P0.leaderEpoch)
+
+    val timestampTopic1P1 = timestampOffsets.get(new TopicPartition(topic1, 1))
+    assertEquals(20, timestampTopic1P1.offset)
+    assertEquals(20, timestampTopic1P1.timestamp)
+    assertEquals(Optional.of(0), timestampTopic1P1.leaderEpoch)
+
     assertEquals("null should be returned when message format is 0.9.0",
       null, timestampOffsets.get(new TopicPartition(topic2, 0)))
     assertEquals("null should be returned when message format is 0.9.0",
       null, timestampOffsets.get(new TopicPartition(topic2, 1)))
-    assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 
0)).offset())
-    assertEquals(80, timestampOffsets.get(new TopicPartition(topic3, 
0)).timestamp())
+
+    val timestampTopic3P0 = timestampOffsets.get(new TopicPartition(topic3, 0))
+    assertEquals(80, timestampTopic3P0.offset)
+    assertEquals(80, timestampTopic3P0.timestamp)
+    assertEquals(Optional.of(0), timestampTopic3P0.leaderEpoch)
+
     assertEquals(null, timestampOffsets.get(new TopicPartition(topic3, 1)))
   }
 
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b5b271e2d44..a075bd05220 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ReplicaNotAvailableException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, 
ListOffsetRequest}
@@ -365,6 +366,21 @@ class PartitionTest {
     assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, 
Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
   }
 
+  @Test
+  def testFetchLatestOffsetIncludesLeaderEpoch(): Unit = {
+    val leaderEpoch = 5
+    val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
+
+    val timestampAndOffsetOpt = 
partition.fetchOffsetForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP,
+      isolationLevel = None,
+      currentLeaderEpoch = Optional.empty(),
+      fetchOnlyFromLeader = true)
+
+    assertTrue(timestampAndOffsetOpt.isDefined)
+
+    val timestampAndOffset = timestampAndOffsetOpt.get
+    assertEquals(Optional.of(leaderEpoch), timestampAndOffset.leaderEpoch)
+  }
 
   private def setupPartitionWithMocks(leaderEpoch: Int,
                                       isLeader: Boolean,
@@ -503,18 +519,22 @@ class PartitionTest {
       baseOffset = 0L)
     partition.appendRecordsToLeader(records, isFromClient = true)
 
-    def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): 
TimestampOffset = {
-      partition.fetchOffsetForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP,
+    def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): 
TimestampAndOffset = {
+      val res = 
partition.fetchOffsetForTimestamp(ListOffsetRequest.LATEST_TIMESTAMP,
         isolationLevel = isolationLevel,
         currentLeaderEpoch = Optional.empty(),
         fetchOnlyFromLeader = true)
+      assertTrue(res.isDefined)
+      res.get
     }
 
-    def fetchEarliestOffset(isolationLevel: Option[IsolationLevel]): 
TimestampOffset = {
-      partition.fetchOffsetForTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP,
+    def fetchEarliestOffset(isolationLevel: Option[IsolationLevel]): 
TimestampAndOffset = {
+      val res = 
partition.fetchOffsetForTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP,
         isolationLevel = isolationLevel,
         currentLeaderEpoch = Optional.empty(),
         fetchOnlyFromLeader = true)
+      assertTrue(res.isDefined)
+      res.get
     }
 
     assertEquals(3L, fetchLatestOffset(isolationLevel = None).offset)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 77289989546..c37da515901 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.io._
 import java.nio.ByteBuffer
 import java.nio.file.{Files, Paths}
-import java.util.Properties
+import java.util.{Optional, Properties}
 
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
@@ -30,10 +30,12 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, 
KafkaConfig, LogDirFailure
 import kafka.utils._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
+import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.EasyMock
 import org.junit.Assert._
@@ -1647,13 +1649,52 @@ class LogTest {
     for(i <- 0 until numMessages) {
       assertEquals(i, readLog(log, i, 
100).records.batches.iterator.next().lastOffset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, 
log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
+        assertEquals(log.logSegments.head.baseOffset, 
log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i 
* 10).get.offset)
+        assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 
10).get.offset)
     }
     log.close()
   }
 
+  @Test
+  def testFetchOffsetByTimestampIncludesLeaderEpoch(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+    val firstTimestamp = mockTime.milliseconds
+    val firstLeaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = firstLeaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1
+    val secondLeaderEpoch = 1
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = secondTimestamp),
+      leaderEpoch = secondLeaderEpoch)
+
+    assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, 
Optional.of(firstLeaderEpoch))),
+      log.fetchOffsetByTimestamp(firstTimestamp))
+    assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, 
Optional.of(secondLeaderEpoch))),
+      log.fetchOffsetByTimestamp(secondTimestamp))
+
+    assertEquals(Some(new 
TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 0L, 
Optional.of(firstLeaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP))
+    assertEquals(Some(new 
TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 2L, 
Optional.of(secondLeaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetRequest.LATEST_TIMESTAMP))
+
+    // The cache can be updated directly after a leader change.
+    // The new latest offset should reflect the updated epoch.
+    log.leaderEpochCache.assign(2, 2L)
+
+    assertEquals(Some(new 
TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
+      log.fetchOffsetByTimestamp(ListOffsetRequest.LATEST_TIMESTAMP))
+  }
+
   /**
    * Test that if messages format version of the messages in a segment is 
before 0.10.0, the time index should be empty.
    */
@@ -1715,9 +1756,9 @@ class LogTest {
     for(i <- 0 until numMessages) {
       assertEquals(i, readLog(log, i, 
100).records.batches.iterator.next().lastOffset)
       if (i == 0)
-        assertEquals(log.logSegments.head.baseOffset, 
log.fetchOffsetsByTimestamp(mockTime.milliseconds + i * 10).get.offset)
+        assertEquals(log.logSegments.head.baseOffset, 
log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 10).get.offset)
       else
-        assertEquals(i, log.fetchOffsetsByTimestamp(mockTime.milliseconds + i 
* 10).get.offset)
+        assertEquals(i, log.fetchOffsetByTimestamp(mockTime.milliseconds + i * 
10).get.offset)
     }
     log.close()
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a10800f8f38..dd016f470b0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, 
EndPoint}
@@ -464,7 +465,7 @@ class KafkaApisTest {
 
     EasyMock.expect(replicaManager.fetchOffsetForTimestamp(tp, 
ListOffsetRequest.LATEST_TIMESTAMP,
       Some(isolationLevel), currentLeaderEpoch, fetchOnlyFromLeader = true))
-      .andReturn(TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, 
latestOffset))
+      .andReturn(Some(new 
TimestampAndOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, latestOffset, 
currentLeaderEpoch)))
 
     val capturedResponse = expectNoThrottling()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel)
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 7f9b3e411aa..9c97c1a448d 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -111,6 +111,50 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, 
Optional.of(secondLeaderEpoch - 1))
   }
 
+  @Test
+  def testResponseIncludesLeaderEpoch(): Unit = {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
+    val firstLeaderId = partitionToLeader(topicPartition.partition)
+
+    TestUtils.generateAndProduceMessages(servers, topic, 10)
+
+    def fetchOffsetAndEpoch(serverId: Int,
+                            timestamp: Long): (Long, Int) = {
+      val targetTimes = Map(topicPartition -> new 
ListOffsetRequest.PartitionData(
+        timestamp, Optional.empty[Integer]())).asJava
+
+      val request = ListOffsetRequest.Builder
+        .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+        .setTargetTimes(targetTimes)
+        .build()
+
+      val response = sendRequest(serverId, request)
+      val partitionData = response.responseData.get(topicPartition)
+      val epochOpt = partitionData.leaderEpoch
+      assertTrue(epochOpt.isPresent)
+
+      (partitionData.offset, epochOpt.get)
+    }
+
+    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.EARLIEST_TIMESTAMP))
+    assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.LATEST_TIMESTAMP))
+
+    // Kill the first leader so that we can verify the epoch change when 
fetching the latest offset
+    killBroker(firstLeaderId)
+    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, 
firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, servers)
+
+    // No changes to written data
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 
ListOffsetRequest.EARLIEST_TIMESTAMP))
+
+    // The latest offset reflects the updated epoch
+    assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, 
ListOffsetRequest.LATEST_TIMESTAMP))
+  }
+
   private def assertResponseError(error: Errors, brokerId: Int, request: 
ListOffsetRequest): Unit = {
     val response = sendRequest(brokerId, request)
     assertEquals(request.partitionTimestamps.size, response.responseData.size)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Return leader epoch in ListOffsets responses
> --------------------------------------------
>
>                 Key: KAFKA-7568
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7568
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>            Priority: Major
>             Fix For: 2.2.0
>
>
> This is part of KIP-320. The changes to the API have already been made, but 
> currently we return unknown epoch. We need to update the logic to search for 
> the epoch corresponding to a fetched offset in the leader epoch cache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to