satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1172535684
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -600,6 +622,176 @@ public String toString() { } } + public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { + int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; + TopicPartition tp = remoteStorageFetchInfo.topicPartition; + FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + + boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + + long offset = fetchInfo.fetchOffset; + int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + + Optional<UnifiedLog> logOptional = fetchLog.apply(tp); + OptionalInt epoch = OptionalInt.empty(); + + if (logOptional.isPresent()) { + Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache(); + if (leaderEpochCache.isDefined()) { + epoch = leaderEpochCache.get().epochForOffset(offset); + } + } + + Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent() + ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) + : Optional.empty(); + + if (!rlsMetadata.isPresent()) { + String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; + throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " + + epochStr + " and partition " + tp + " which does not exist in remote tier."); + } + + int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); + InputStream remoteSegInputStream = null; + try { + // Search forward for the position of the last offset that is greater than or equal to the target offset + remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); + RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + + RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + + if (firstBatch == null) + return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, + includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + + int updatedFetchSize = Review Comment: There is no risk here but it is good to be consistent with the local read pattern to return empty records for that case. Updated with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org