jsancio commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r566237380



##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -474,6 +474,7 @@ public void testAccumulatorClearedAfterBecomingFollower() 
throws Exception {
             .thenReturn(buffer);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withMaxUnflushedBytes(KafkaRaftClient.MAX_BATCH_SIZE)

Review comment:
       This comment applies to a few places in this file.
   
   Is there a reason why we override this value?

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -617,6 +620,38 @@ public void 
testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
         assertEquals(3L, context.log.endOffset().offset);
     }
 
+    @Test
+    public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws 
Exception {
+        // This test verifies that the client will get woken up immediately
+        // if the linger timeout has expired during an append

Review comment:
       > "if the linger timeout..."
   
   Did you mean minimum flush bytes?

##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1078,6 +1078,7 @@ private static FetchResponseData snapshotFetchResponse(
     private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext 
context, RawSnapshotWriter snapshot) {
         return new SnapshotWriter<>(
             snapshot,
+            1024,

Review comment:
       In the snapshot tests, we can set the `maxUnflushedBytes` to the same 
value as `maxBatchSize`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";
     public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = 
QUORUM_PREFIX + "append.max.unflushed.bytes";
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_DOC = "The 
maximum number of bytes that the leader " +
+        "will allow to be accumulated before flushing them to disk ";

Review comment:
       ```suggestion
       public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_DOC = "The 
maximum number of bytes that the leader " +
           "will allow to be accumulated before appending to the topic 
partition.";
   ```

##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -617,6 +620,38 @@ public void 
testChannelWokenUpIfLingerTimeoutReachedDuringAppend() throws Except
         assertEquals(3L, context.log.endOffset().offset);
     }
 
+    @Test
+    public void testChannelWokenUpIfMinFlushSizeReachedDuringAppend() throws 
Exception {
+        // This test verifies that the client will get woken up immediately
+        // if the linger timeout has expired during an append
+
+        int localId = 0;
+        int otherNodeId = 1;
+        int minFlushSizeInBytes = 120;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withMaxUnflushedBytes(minFlushSizeInBytes)

Review comment:
       The local variable is called `minFlushSizeInBytes` yet the 
`KafkaRaftClient` uses `maxUnflushBytes`. Did we agree to use 
`minUnflushedBytes`?

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";

Review comment:
       Outside the scope of this PR but how about changing this description to:
   > The duration in milliseconds that the leader will wait for writes to 
accumulate before appending to the topic partition.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -76,6 +76,11 @@
         "wait for writes to accumulate before flushing them to disk.";
     public static final int DEFAULT_QUORUM_LINGER_MS = 25;
 
+    public static final String QUORUM_APPEND_MAX_UNFLUSHED_BYTES_CONFIG = 
QUORUM_PREFIX + "append.max.unflushed.bytes";

Review comment:
       @vamossagar12 and @hachikuji  How about we call this 
`append.linger.bytes`?
   
   Excuse the back and forth on this name put let me explain.
   
   In the current implementation this does determine when the bytes will be 
flush because the `KafkaRaftClient` flushes the `Log` every time it drains the 
`BatchAccumulator` and it appends to the log.
   
   
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1871-L1879
   
   In the past, we have talked about delaying the flush/fsync to the log. I 
believe that the invariant that we need to satisfied is that the leader cannot 
increase the high-watermark past the flushed offsets. Followers need to 
flush/fsync before sending a Fetch request since the leader assumes that the 
followers have safely replicated the offset in the Fetch request, 
   
   If in the future we implement this logic or something similar, I think the 
name `append.max.unflushed.bytes` would not be accurate since it contains the 
word "unflushed". And the leader may decide to have more than 
`append.max.unflushed.bytes` bytes unflushed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to