junrao commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1187994355


##########
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.storage.internals.log._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.{mock, when}
+
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+
+import scala.collection._
+
+class DelayedRemoteFetchTest {
+  private val maxBytes = 1024
+  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+  private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic")
+  private val fetchOffset = 500L
+  private val logStartOffset = 0L
+  private val currentLeaderEpoch = Optional.of[Integer](10)
+  private val replicaId = 1
+
+  private val fetchStatus = FetchPartitionStatus(
+    startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+    fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+  private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 
500)
+
+  @Test
+  def testFetchWithFencedEpoch(): Unit = {

Review Comment:
   Hmm, where is logic to simulate a fenced epoch in this test?



##########
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogReaderTest {
+    RemoteLogManager mockRLM = mock(RemoteLogManager.class);
+    LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
+    Records records = mock(Records.class);
+
+

Review Comment:
   extra new line.



##########
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.storage.internals.log._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.{mock, when}
+
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+
+import scala.collection._
+
+class DelayedRemoteFetchTest {
+  private val maxBytes = 1024
+  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+  private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic")
+  private val fetchOffset = 500L
+  private val logStartOffset = 0L
+  private val currentLeaderEpoch = Optional.of[Integer](10)
+  private val replicaId = 1
+
+  private val fetchStatus = FetchPartitionStatus(
+    startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+    fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+  private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 
500)
+
+  @Test
+  def testFetchWithFencedEpoch(): Unit = {
+    var actualTopicPartition: Option[TopicIdPartition] = None
+    var fetchResultOpt: Option[FetchPartitionData] = None
+
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      assertEquals(1, responses.size)
+      actualTopicPartition = Some(responses.head._1)
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    future.complete(null)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+    val highWatermark = 100
+    val leaderLogStartOffset = 10
+    val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
Seq(topicIdPartition -> fetchStatus), fetchParams,
+      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+    assertTrue(actualTopicPartition.isDefined)
+    assertEquals(topicIdPartition, actualTopicPartition.get)
+    assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+    assertEquals(highWatermark, fetchResult.highWatermark)
+    assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
+  }
+
+  @Test
+  def testNotLeaderOrFollower(): Unit = {
+    var actualTopicPartition: Option[TopicIdPartition] = None
+    var fetchResultOpt: Option[FetchPartitionData] = None
+
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      assertEquals(1, responses.size)
+      actualTopicPartition = Some(responses.head._1)
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    // throw exception while getPartition
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenThrow(new NotLeaderOrFollowerException(s"Replica for 
$topicIdPartition not available"))
+
+    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+
+    val logReadInfo = buildReadResult(Errors.NONE)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
Seq(topicIdPartition -> fetchStatus), fetchParams,
+      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+    // delayed remote fetch should still be able to complete
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+    assertEquals(topicIdPartition, actualTopicPartition.get)
+    assertTrue(fetchResultOpt.isDefined)
+    assertTrue(fetchResultOpt.isDefined)
+  }
+
+  @Test
+  def testErrorLogReadInfo(): Unit = {
+    var actualTopicPartition: Option[TopicIdPartition] = None
+    var fetchResultOpt: Option[FetchPartitionData] = None
+
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      assertEquals(1, responses.size)
+      actualTopicPartition = Some(responses.head._1)
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+
+    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    future.complete(null)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+
+    // build a read result with error
+    val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
Seq(topicIdPartition -> fetchStatus), fetchParams,
+      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+    assertEquals(topicIdPartition, actualTopicPartition.get)
+    assertTrue(fetchResultOpt.isDefined)
+    assertTrue(fetchResultOpt.isDefined)

Review Comment:
   This line duplicates the line before.



##########
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala:
##########
@@ -156,8 +156,7 @@ object AbstractCoordinatorConcurrencyTest {
   trait CoordinatorMember {
   }
 
-  class TestReplicaManager extends ReplicaManager(
-    null, null, null, null, null, null, null, null, null, null, null, null, 
null, null, null, None, null) {
+  class TestReplicaManager extends ReplicaManager(null, null, null, null, 
null, null, null, null, null, null, null, null, null, null, null, None, null) {

Review Comment:
   Do we need to change this since it makes the line quite long?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1153,14 +1193,15 @@ class ReplicaManager(val config: KafkaConfig,
       logReadResultMap.put(topicIdPartition, logReadResult)
     }
 
-    // respond immediately if 1) fetch request does not want to wait
+    // Respond immediately if 1) no remote fetches are required
     //                        2) fetch request does not require any data
-    //                        3) has enough data to respond
-    //                        4) some error happens while reading data
-    //                        5) we found a diverging epoch
-    //                        6) has a preferred read replica
-    if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= 
params.minBytes || errorReadingData ||
-      hasDivergingEpoch || hasPreferredReadReplica) {
+    //                        3) fetch request does not require any data

Review Comment:
   `2)` should be fetch request does not want to wait. Also, the comment is 
written as if `1)` is also disjunctive, but it is not.



##########
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.storage.internals.log._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.{mock, when}
+
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+
+import scala.collection._
+
+class DelayedRemoteFetchTest {
+  private val maxBytes = 1024
+  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+  private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic")
+  private val fetchOffset = 500L
+  private val logStartOffset = 0L
+  private val currentLeaderEpoch = Optional.of[Integer](10)
+  private val replicaId = 1
+
+  private val fetchStatus = FetchPartitionStatus(
+    startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+    fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+  private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 
500)
+
+  @Test
+  def testFetchWithFencedEpoch(): Unit = {
+    var actualTopicPartition: Option[TopicIdPartition] = None
+    var fetchResultOpt: Option[FetchPartitionData] = None
+
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      assertEquals(1, responses.size)
+      actualTopicPartition = Some(responses.head._1)
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    future.complete(null)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+    val highWatermark = 100
+    val leaderLogStartOffset = 10
+    val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
Seq(topicIdPartition -> fetchStatus), fetchParams,
+      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+    assertTrue(actualTopicPartition.isDefined)
+    assertEquals(topicIdPartition, actualTopicPartition.get)
+    assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+    assertEquals(highWatermark, fetchResult.highWatermark)
+    assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
+  }
+
+  @Test
+  def testNotLeaderOrFollower(): Unit = {
+    var actualTopicPartition: Option[TopicIdPartition] = None
+    var fetchResultOpt: Option[FetchPartitionData] = None
+
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      assertEquals(1, responses.size)
+      actualTopicPartition = Some(responses.head._1)
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    // throw exception while getPartition
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenThrow(new NotLeaderOrFollowerException(s"Replica for 
$topicIdPartition not available"))
+
+    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null, false)
+
+    val logReadInfo = buildReadResult(Errors.NONE)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
Seq(topicIdPartition -> fetchStatus), fetchParams,
+      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+    // delayed remote fetch should still be able to complete
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+    assertEquals(topicIdPartition, actualTopicPartition.get)
+    assertTrue(fetchResultOpt.isDefined)
+    assertTrue(fetchResultOpt.isDefined)

Review Comment:
   This line duplicates the line before.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,25 +623,210 @@ public String toString() {
         }
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-        Optional<UnifiedLog> maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-        if (maybeLog.isPresent()) {
-            UnifiedLog log = maybeLog.get();
-            Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-            if (maybeLeaderEpochFileCache.isDefined()) {
-                LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-                OptionalInt epoch = cache.latestEpoch();
-                while (!offset.isPresent() && epoch.isPresent()) {
-                    offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-                    epoch = cache.previousEpoch(epoch.getAsInt());
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = 
epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadataOptional.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+        int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int firstBatchSize = firstBatch.sizeInBytes();
+            // An empty record is sent instead of an incomplete batch when
+            //  - there is no minimum-one-message constraint and
+            //  - the first batch size is more than maximum bytes that can be 
sent.
+            //  - for FetchRequest version 3 or above and

Review Comment:
   remove "and" at the end



##########
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.storage.internals.log._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.{mock, when}
+
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+
+import scala.collection._

Review Comment:
   Could we write the new test in java?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to