junrao commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1184289360


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1160,48 +1172,100 @@ class ReplicaManager(val config: KafkaConfig,
           fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
         })
       }
-      val delayedFetch = new DelayedFetch(
-        params = params,
-        fetchPartitionStatus = fetchPartitionStatus,
-        replicaManager = this,
-        quota = quota,
-        responseCallback = responseCallback
-      )
-
-      // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-      // try to complete the request immediately, otherwise put it into the 
purgatory;
-      // this is because while the delayed fetch operation is being created, 
new requests
-      // may arrive and hence make this operation completable.
-      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+      if (remoteFetchInfo.isPresent) {
+        val key = new 
TopicPartitionOperationKey(remoteFetchInfo.get.topicPartition.topic(), 
remoteFetchInfo.get.topicPartition.partition())
+        val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
+        var remoteFetchTask: Future[Void] = null
+        try {
+          remoteFetchTask = 
remoteLogManager.get.asyncRead(remoteFetchInfo.get, (result: 
RemoteLogReadResult) => {
+            remoteFetchResult.complete(result)
+            delayedRemoteFetchPurgatory.checkAndComplete(key)
+          })
+        } catch {
+          // if the task queue of remote storage reader thread pool is full, 
return what we currently have
+          // (the data read from local log segment for the other 
topic-partitions) and an error for the topic-partition that
+          // we couldn't read from remote storage
+          case e: RejectedExecutionException =>
+            val fetchPartitionData = logReadResults.map { case (tp, result) =>
+              val r = {
+                if 
(tp.topicPartition().equals(remoteFetchInfo.get.topicPartition))
+                  createLogReadResult(e)
+                else
+                  result
+              }
+
+              tp -> r.toFetchPartitionData(false)
+            }
+            responseCallback(fetchPartitionData)
+            return
+        }
+
+        // If there is remote data, we will read remote data, instead of 
waiting for new data.
+        val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo.get,
+          fetchPartitionStatus, params, logReadResults, this, responseCallback)
+
+        delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
+      } else {
+        // If there is not enough data to respond and there is no remote data, 
we will let the fetch request
+        // to wait for new data.

Review Comment:
   let the fetch request to wait => let the fetch request wait 



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1320,6 +1405,33 @@ class ReplicaManager(val config: KafkaConfig,
     result
   }
 
+  private def createLogReadResult(highWatermark: Long,

Review Comment:
   Should this be in `object ReplicaManager`?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1320,6 +1405,33 @@ class ReplicaManager(val config: KafkaConfig,
     result
   }
 
+  private def createLogReadResult(highWatermark: Long,
+                          leaderLogStartOffset: Long,
+                          leaderLogEndOffset: Long,
+                          e: Throwable) = {
+    LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
+      divergingEpoch = None,
+      highWatermark,
+      leaderLogStartOffset,
+      leaderLogEndOffset,
+      followerLogStartOffset = -1L,
+      fetchTimeMs = -1L,
+      lastStableOffset = None,
+      exception = Some(e))
+  }
+
+  def createLogReadResult(e: Throwable): LogReadResult = {

Review Comment:
   Should this be in `object ReplicaManager`?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,25 +622,208 @@ public String toString() {
         }
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-        Optional<UnifiedLog> maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-        if (maybeLog.isPresent()) {
-            UnifiedLog log = maybeLog.get();
-            Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-            if (maybeLeaderEpochFileCache.isDefined()) {
-                LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-                OptionalInt epoch = cache.latestEpoch();
-                while (!offset.isPresent() && epoch.isPresent()) {
-                    offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-                    epoch = cache.previousEpoch(epoch.getAsInt());
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = 
epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadataOptional.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+        int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            // An empty record is sent instead of an incomplete batch when 
there is no minimum-one-message constraint
+            // and for FetchRequest version 3 and above and the first batch 
size is more than maximum bytes that can be sent.

Review Comment:
   This line doesn't read well.  Also, should it be put above the following 
line?
   
   ```
               int updatedFetchSize =
                       remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
   
   ```



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1273,17 +1329,46 @@ class ReplicaManager(val config: KafkaConfig,
                  _: FencedLeaderEpochException |
                  _: ReplicaNotAvailableException |
                  _: KafkaStorageException |
-                 _: OffsetOutOfRangeException |
                  _: InconsistentTopicIdException) =>
-          LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-            divergingEpoch = None,
-            highWatermark = UnifiedLog.UnknownOffset,
-            leaderLogStartOffset = UnifiedLog.UnknownOffset,
-            leaderLogEndOffset = UnifiedLog.UnknownOffset,
-            followerLogStartOffset = UnifiedLog.UnknownOffset,
-            fetchTimeMs = -1L,
-            lastStableOffset = None,
-            exception = Some(e))
+          createLogReadResult(e)
+        case e: OffsetOutOfRangeException =>
+          // In case of offset out of range errors, check for remote log 
manager for non-compacted topics
+          // to fetch from remote storage. `log` instance should not be null 
here as that would have caught earlier with

Review Comment:
   that would have caught => that would have been caught



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1273,17 +1329,46 @@ class ReplicaManager(val config: KafkaConfig,
                  _: FencedLeaderEpochException |
                  _: ReplicaNotAvailableException |
                  _: KafkaStorageException |
-                 _: OffsetOutOfRangeException |
                  _: InconsistentTopicIdException) =>
-          LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-            divergingEpoch = None,
-            highWatermark = UnifiedLog.UnknownOffset,
-            leaderLogStartOffset = UnifiedLog.UnknownOffset,
-            leaderLogEndOffset = UnifiedLog.UnknownOffset,
-            followerLogStartOffset = UnifiedLog.UnknownOffset,
-            fetchTimeMs = -1L,
-            lastStableOffset = None,
-            exception = Some(e))
+          createLogReadResult(e)
+        case e: OffsetOutOfRangeException =>
+          // In case of offset out of range errors, check for remote log 
manager for non-compacted topics
+          // to fetch from remote storage. `log` instance should not be null 
here as that would have caught earlier with
+          // NotLeaderForPartitionException or ReplicaNotAvailableException.
+          // If it is from a follower then send the offset metadata only as 
the data is already available in remote
+          // storage.
+          if (remoteLogManager.isDefined && log != null && 
log.remoteLogEnabled() &&
+            // Check that the fetch offset is with in the offset range with in 
the remote storage layer.
+            log.logStartOffset <= offset && offset < 
log.localLogStartOffset()) {
+            // For follower fetch requests, throw an error saying that this 
offset is moved to tiered storage.
+            val highWatermark = log.highWatermark
+            val leaderLogStartOffset = log.logStartOffset
+            val leaderLogEndOffset = log.logEndOffset
+            if (params.isFromFollower) {
+              createLogReadResult(highWatermark, leaderLogStartOffset, 
leaderLogEndOffset,
+                new OffsetMovedToTieredStorageException("Given offset" + 
offset + " is moved to tiered storage"))
+            } else {
+              // Create a dummy FetchDataInfo with the remote storage fetch 
information.
+              // For the first topic-partition that needs remote data, we will 
use this information to read the data in another thread.
+              // For the following topic-partitions, we return an empty record 
set
+              val fetchDataInfo =
+                new FetchDataInfo(new 
LogOffsetMetadata(fetchInfo.fetchOffset), MemoryRecords.EMPTY, false, 
Optional.empty(),
+                  Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, 
minOneMessage, tp.topicPartition(),
+                    fetchInfo, params.isolation, params.hardMaxBytesLimit())))
+
+              LogReadResult(checkFetchDataInfo(partition, fetchDataInfo),
+                divergingEpoch = None,
+                highWatermark,
+                leaderLogStartOffset,
+                leaderLogEndOffset,
+                followerLogStartOffset,
+                time.milliseconds,

Review Comment:
   Could we reuse fetchTimeMs instead of calling time.milliseconds again?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
           fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
         })
       }
-      val delayedFetch = new DelayedFetch(
-        params = params,
-        fetchPartitionStatus = fetchPartitionStatus,
-        replicaManager = this,
-        quota = quota,
-        responseCallback = responseCallback
-      )
-
-      // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-      // try to complete the request immediately, otherwise put it into the 
purgatory;
-      // this is because while the delayed fetch operation is being created, 
new requests
-      // may arrive and hence make this operation completable.
-      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+      if (remoteFetchInfo.isPresent) {

Review Comment:
   In the following code, we should go into that branch only if 
`remoteFetchInfo` is empty, right? Otherwise, if we could get into a situation 
that a remote partition is never served because the fetch request  is always 
satisfied with new local data on other partitions.
   ```
       if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= 
params.minBytes || errorReadingData ||
         hasDivergingEpoch || hasPreferredReadReplica) {
   ```



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,25 +622,208 @@ public String toString() {
         }
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-        Optional<UnifiedLog> maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-        if (maybeLog.isPresent()) {
-            UnifiedLog log = maybeLog.get();
-            Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-            if (maybeLeaderEpochFileCache.isDefined()) {
-                LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-                OptionalInt epoch = cache.latestEpoch();
-                while (!offset.isPresent() && epoch.isPresent()) {
-                    offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-                    epoch = cache.previousEpoch(epoch.getAsInt());
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = 
epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadataOptional.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+        int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            // An empty record is sent instead of an incomplete batch when 
there is no minimum-one-message constraint
+            // and for FetchRequest version 3 and above and the first batch 
size is more than maximum bytes that can be sent.
+            int firstBatchSize = firstBatch.sizeInBytes();
+            if (!remoteStorageFetchInfo.minOneMessage &&
+                    !remoteStorageFetchInfo.hardMaxBytesLimit &&
+                    firstBatchSize > maxBytes) {
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+            }
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatchSize;
+
+            if (remainingBytes > 0) {
+                // read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(
+                    new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+                    MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, 
fetchDataInfo, logOptional.get());
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                                 RemoteLogSegmentMetadata 
segmentMetadata,
+                                                 FetchDataInfo fetchInfo,
+                                                 UnifiedLog log) throws 
RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final Set<FetchResponseData.AbortedTransaction> abortedTransactions = 
new HashSet<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator, log);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                            long upperBoundOffset,
+                                            RemoteLogSegmentMetadata 
segmentMetadata,
+                                            Consumer<List<AbortedTxn>> 
accumulator,
+                                            UnifiedLog log) throws 
RemoteStorageException {
+        Iterator<LogSegment> localLogSegments = 
JavaConverters.asJavaIterator(log.logSegments().iterator());
+
+        boolean searchInLocalLog = false;
+        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
+        Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
+
+        while (txnIndexOpt.isPresent()) {
+            TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (!searchResult.isComplete) {
+                if (!searchInLocalLog) {
+                    nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log);
+
+                    txnIndexOpt = nextSegmentMetadataOpt.map(x -> 
indexCache.getIndexEntry(x).txnIndex());
+                    if (!txnIndexOpt.isPresent()) {
+                        searchInLocalLog = true;
+                    }
+                }
+
+                if (searchInLocalLog) {
+                    txnIndexOpt = (localLogSegments.hasNext()) ? 
Optional.of(localLogSegments.next().txnIndex()) : Optional.empty();
                 }
+            } else {
+                return;
+            }
+        }
+    }
+
+    private Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, UnifiedLog 
log) throws RemoteStorageException {
+        Option<LeaderEpochFileCache> leaderEpochFileCacheOption = 
log.leaderEpochCache();
+        if (leaderEpochFileCacheOption.isEmpty()) {
+            return Optional.empty();
+        }
+
+        TopicPartition topicPartition = 
segmentMetadata.topicIdPartition().topicPartition();
+        long nextSegmentBaseOffset = segmentMetadata.endOffset() + 1;
+        OptionalInt epoch = 
OptionalInt.of(segmentMetadata.segmentLeaderEpochs().lastEntry().getKey());

Review Comment:
   You mentioned the following in the other comment.
   `We should find the respective epoch for the target offset and use that to 
find the remote log segment metadata.`
   
   However, here, epoch seems to be for the offset before 
nextSegmentBaseOffset, not at nextSegmentBaseOffset?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1273,17 +1329,46 @@ class ReplicaManager(val config: KafkaConfig,
                  _: FencedLeaderEpochException |
                  _: ReplicaNotAvailableException |
                  _: KafkaStorageException |
-                 _: OffsetOutOfRangeException |
                  _: InconsistentTopicIdException) =>
-          LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-            divergingEpoch = None,
-            highWatermark = UnifiedLog.UnknownOffset,
-            leaderLogStartOffset = UnifiedLog.UnknownOffset,
-            leaderLogEndOffset = UnifiedLog.UnknownOffset,
-            followerLogStartOffset = UnifiedLog.UnknownOffset,
-            fetchTimeMs = -1L,
-            lastStableOffset = None,
-            exception = Some(e))
+          createLogReadResult(e)
+        case e: OffsetOutOfRangeException =>
+          // In case of offset out of range errors, check for remote log 
manager for non-compacted topics
+          // to fetch from remote storage. `log` instance should not be null 
here as that would have caught earlier with
+          // NotLeaderForPartitionException or ReplicaNotAvailableException.
+          // If it is from a follower then send the offset metadata only as 
the data is already available in remote
+          // storage.
+          if (remoteLogManager.isDefined && log != null && 
log.remoteLogEnabled() &&
+            // Check that the fetch offset is with in the offset range with in 
the remote storage layer.

Review Comment:
   two occurrences
   with in => within



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+                            ? firstBatch.sizeInBytes() : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatch.sizeInBytes();
+
+            if (remainingBytes > 0) {
+                // input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+                // read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), 
fetchDataInfo);
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                         RemoteLogSegmentMetadata 
segmentMetadata,
+                                         FetchDataInfo fetchInfo) throws 
RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final List<FetchResponseData.AbortedTransaction> abortedTransactions = 
new ArrayList<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                    long upperBoundOffset,
+                                    RemoteLogSegmentMetadata segmentMetadata,
+                                    Consumer<List<AbortedTxn>> accumulator) 
throws RemoteStorageException {
+        TopicPartition topicPartition = 
segmentMetadata.topicIdPartition().topicPartition();
+        Iterator<LogSegment> localLogSegments = fetchLog.apply(topicPartition)
+                .map(log -> JavaConverters.asJavaCollection(log.logSegments()))
+                .map(Collection::iterator)
+                .orElse(Collections.emptyIterator());
+
+        boolean searchInLocalLog = false;
+        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
+        Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
+
+        while (txnIndexOpt.isPresent()) {
+            TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (!searchResult.isComplete) {
+                if (!searchInLocalLog) {
+                    nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get());
+
+                    txnIndexOpt = nextSegmentMetadataOpt.map(x -> 
indexCache.getIndexEntry(x).txnIndex());
+                    if (!txnIndexOpt.isPresent()) {
+                        searchInLocalLog = true;
+                    }
+                }
+
+                if (searchInLocalLog) {
+                    txnIndexOpt = (localLogSegments.hasNext()) ? 
Optional.of(localLogSegments.next().txnIndex()) : Optional.empty();

Review Comment:
   Where is the logic to remove the duplicates?
   
   Also, remote segments could overlap too. Is there  any logic to remove the 
duplicates from remote segments too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to