ahuang98 commented on code in PR #19800: URL: https://github.com/apache/kafka/pull/19800#discussion_r2112701439
########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -264,13 +267,15 @@ public short partitionRecordVersion() { } public short fetchRequestVersion() { - if (this.isAtLeast(IBP_3_9_IV0)) { + if (isAtLeast(IBP_4_1_IV1)) { + return 18; Review Comment: should have thought about this more during the KIP review - can you remind me why the default value of the new HW field would ever be used? if the fetch request version to use is dictated by MV, then shouldn't all nodes agree on which fetch request/response version to use? ########## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ########## @@ -116,6 +116,9 @@ public enum MetadataVersion { // Streams groups are early access in 4.1 (KIP-1071). IBP_4_1_IV0(26, "4.1", "IV0", false), + // Send FETCH verion 18 in the replica fetcher (KIP-1166) Review Comment: nit: "version" ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -1556,26 +1572,25 @@ private CompletableFuture<FetchResponseData> handleFetchRequest( Optional.empty() ); } - } - - // FIXME: `completionTimeMs`, which can be null - logger.trace( - "Completing delayed fetch from {} starting at offset {} at {}", - replicaKey, - fetchPartition.fetchOffset(), - completionTimeMs - ); + } else { + logger.trace( + "Completing delayed fetch from {} starting at offset {} at {}", + replicaKey, + fetchPartition.fetchOffset(), + completionTimeMs + ); - // It is safe to call tryCompleteFetchRequest because only the polling thread completes this - // future successfully. This is true because only the polling thread appends record batches to - // the log from maybeAppendBatches. - return tryCompleteFetchRequest( - requestMetadata.listenerName(), - requestMetadata.apiVersion(), - replicaKey, - fetchPartition, - time.milliseconds() - ); + // It is safe to call tryCompleteFetchRequest because only the polling thread completes this + // future successfully. This is true because only the polling thread appends record batches to + // the log from maybeAppendBatches. + return tryCompleteFetchRequest( + requestMetadata.listenerName(), + requestMetadata.apiVersion(), + replicaKey, + fetchPartition, + completionTimeMs Review Comment: nice catch ########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -386,6 +385,11 @@ private void onUpdateLeaderHighWatermark( // records still held in memory directly to the listener appendPurgatory.maybeComplete(highWatermark.offset(), currentTimeMs); + // After updating the high-watermark, complete all of the deferred + // fetch requests. This is always correct because all fetch request + // deffered have a HWM less or equal to the previous leader's HWM. Review Comment: nit: deferred -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org