hachikuji commented on a change in pull request #9756:
URL: https://github.com/apache/kafka/pull/9756#discussion_r560627883
##########
File path:
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
##########
@@ -37,6 +37,7 @@
private final Time time;
private final SimpleTimer lingerTimer;
private final int lingerMs;
+ private final int minFlushSize;
Review comment:
> I am slightly confused on the advantage of letting the config to be
set higher than maxBatchSize(1MB) and then limiting the effective batch size to
the minimum of maxUnflushedBytes and maxBatchSize. Won't it lead to confusion
in terms of usage?
Not sure if this is the cause of the confusion, but just to be clear, what
I'm referring to as the batch size is just a limit to the size of a
`DefaultRecordBatch`. Records in Kafka are grouped together into batches with a
schema roughly like this:
```
Batch => Size Offset Crc ... [Record]
```
It is useful to keep batches from getting too large because we don't have a
way to index into a batch without storing the whole decompressed contents in
memory.
However, we can let `maxUnflushedBytes` be as large as we want. That just
means that we will collect multiple batches (i.e. multiple `CompletedBatch`
instances) as the code already does. So if `maxUnflushedBytes` is set to 4MB,
then we will end up collecting 4x1MB batches before we flush to disk rather
than collecting one big 4MB batch.
We could also require that `maxUnflushedBytes` be less than or equal to our
desired max batch size, but if it's a similar level of effort to support the
more general config, then that seems better.
> Also @hachikuji , wanted to understand how the newly proposed config;
quorum.append.max.linger.ms would interplay with the existing
quorum.append.linger.ms config. As per my understanding, the moment
quorum.append.linger.ms is crossed, the flush would start. This happens even in
this new implementation irrespective of hitting maxUnflushedBytes or not. Are
you suggesting that we still hold onto writes until we hit the
quorum.append.max.linger.ms thereby overriding quorum.append.linger.ms?
My expectation is that we flush after either of these are reached. So if the
linger time is hit first, then we take whatever unflushed bytes we have even if
they are smaller than `maxUnflushedBytes`. On the other hand, if
`maxUnflushedBytes` is reached, then we ignore linger. I think this is
consistent with the implementation I suggested here:
https://github.com/apache/kafka/pull/9756#discussion_r552268919. Basically we
override `timeUntilDrain` so that it returns 0 when `maxUnflushedBytes` is
reached.
Hope that helps!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]