jsancio commented on a change in pull request #9756: URL: https://github.com/apache/kafka/pull/9756#discussion_r566237380
########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ########## @@ -474,6 +474,7 @@ public void testAccumulatorClearedAfterBecomingFollower() throws Exception { .thenReturn(buffer); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withMaxUnflushedBytes(KafkaRaftClient.MAX_BATCH_SIZE) Review comment: This comment applies to a few places in this file. Is there a reason why we override this value? ########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ########## @@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except assertEquals(3L, context.log.endOffset().offset); } + @Test + public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception { + // This test verifies that the client will get woken up immediately + // if the linger timeout has expired during an append Review comment: > "if the linger timeout..." Did you mean minimum flush bytes? ########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java ########## @@ -1078,6 +1078,7 @@ private static FetchResponseData snapshotFetchResponse( private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) { return new SnapshotWriter<>( snapshot, + 1024, Review comment: In the snapshot tests, we can set the `maxUnflushedBytes` to the same value as `maxBatchSize`. ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ########## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; public static final int DEFAULT_QUORUM_LINGER_MS = 25; + public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes"; + public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_DOC = "The maximum number of bytes that the leader " + + "will allow to be accumulated before flushing them to disk "; Review comment: ```suggestion public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_DOC = "The maximum number of bytes that the leader " + "will allow to be accumulated before appending to the topic partition."; ``` ########## File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java ########## @@ -617,6 +620,38 @@ public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except assertEquals(3L, context.log.endOffset().offset); } + @Test + public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws Exception { + // This test verifies that the client will get woken up immediately + // if the linger timeout has expired during an append + + int localId = 0; + int otherNodeId = 1; + int minFlushSizeInBytes = 120; + Set<Integer> voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withMaxUnflushedBytes(minFlushSizeInBytes) Review comment: The local variable is called `minFlushSizeInBytes` yet the `KafkaRaftClient` uses `maxUnflushBytes`. Did we agree to use `minUnflushedBytes`? ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ########## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; Review comment: Outside the scope of this PR but how about changing this description to: > The duration in milliseconds that the leader will wait for writes to accumulate before appending to the topic partition. ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ########## @@ -76,6 +76,11 @@ "wait for writes to accumulate before flushing them to disk."; public static final int DEFAULT_QUORUM_LINGER_MS = 25; + public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = QUORUM_PREFIX + "append.max.unflushed.bytes"; Review comment: @vamossagar12 and @hachikuji How about we call this `append.linger.bytes`? Excuse the back and forth on this name put let me explain. In the current implementation this does determine when the bytes will be flush because the `KafkaRaftClient` flushes the `Log` every time it drains the `BatchAccumulator` and it appends to the log. https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1871-L1879 In the past, we have talked about delaying the flush/fsync to the log. I believe that the invariant that we need to satisfied is that the leader cannot increase the high-watermark past the flushed offsets. Followers need to flush/fsync before sending a Fetch request since the leader assumes that the followers have safely replicated the offset in the Fetch request, If in the future we implement this logic or something similar, I think the name `append.max.unflushed.bytes` would not be accurate since it contains the word "unflushed". And the leader may decide to have more than `append.max.unflushed.bytes` bytes unflushed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org