jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r555386153
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -970,40 +996,98 @@ private FetchResponseData tryCompleteFetchRequest( long fetchOffset = request.fetchOffset(); int lastFetchedEpoch = request.lastFetchedEpoch(); LeaderState state = quorum.leaderStateOrThrow(); - Optional<OffsetAndEpoch> divergingEpochOpt = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - - if (divergingEpochOpt.isPresent()) { - Optional<FetchResponseData.EpochEndOffset> divergingEpoch = - divergingEpochOpt.map(offsetAndEpoch -> new FetchResponseData.EpochEndOffset() - .setEpoch(offsetAndEpoch.epoch) - .setEndOffset(offsetAndEpoch.offset)); - return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark()); - } else { + ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + + final Records records; + if (validatedOffsetAndEpoch.type() == ValidatedFetchOffsetAndEpoch.Type.VALID) { LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { onUpdateLeaderHighWatermark(state, currentTimeMs); } - return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark()); + records = info.records; + } else { + records = MemoryRecords.EMPTY; } + + return buildFetchResponse(Errors.NONE, records, validatedOffsetAndEpoch, state.highWatermark()); } /** * Check whether a fetch offset and epoch is valid. Return the diverging epoch, which * is the largest epoch such that subsequent records are known to diverge. */ - private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { - if (fetchOffset == 0 && lastFetchedEpoch == 0) { - return Optional.empty(); + private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { + if (log.startOffset() == 0 && fetchOffset == 0) { + if (lastFetchedEpoch != 0) { + logger.warn( + "Replica sent a zero fetch offset ({}) but the last fetched epoch ({}) was not zero", + fetchOffset, + lastFetchedEpoch + ); + } + return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); + } + + if (fetchOffset < log.startOffset() || fetchOffset == log.startOffset()) { + // Snapshot must be present if start offset is non zero. + OffsetAndEpoch latestSnapshotId = log.latestSnapshotId().orElseThrow(() -> { + return new IllegalStateException( + String.format( + "The log start offset (%s) was greater than zero but no snapshot was found", + log.startOffset() + ) + ); + }); + + if (fetchOffset < log.startOffset() || lastFetchedEpoch != latestSnapshotId.epoch) { Review comment: Yeah. I think this code was wrong. I changed the `ReplicatedLog` interface to have two snapshots `latestSnapshotId` and `startSnapshotId`. We need this for a couple of reasons. One of the reasons is that `latestSnapshotId` is changed concurrently by the state machine. While `startSnapshotId` is changed by the polling thread. Another reason is that increasing the log start offset will be delayed based on what is describe in https://issues.apache.org/jira/browse/KAFKA-12155. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org