soarez commented on code in PR #12752:
URL: https://github.com/apache/kafka/pull/12752#discussion_r996346998
##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##########
@@ -127,7 +127,9 @@ public RecordAccumulator(LogContext logContext,
this.closed = false;
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
- this.batchSize = batchSize;
+ // As per Kafka producer configuration documentation batch.size may be
set to 0 to explicitly disable
+ // batching which in practice actually means using a batch size of 1.
+ this.batchSize = Math.max(1, batchSize);
Review Comment:
Is there a reason why we'd prefer that? I only see disadvantages in
sanitizing this value there:
1. In `KafkaProducer` the only other use of this config is in `BufferPool`
which has not changed for a long while. The breaking behavior stems from recent
changes that are localized to the `RecordAccumulator` and the
`BuiltinPartitioner`. It does seem like `BufferPool` will work the same with
buffer size either 0 or 1, but still, **if we focus this fix on the components
where the bug was introduced we avoid causing further unintentional changes to
previous behavior**.
2. If the change is made in `RecordAccumulator` **we can use the partition
change callbacks to test for this bug**. To detect this bug with a test against
`KafkaProducer` we'll need to execute `producer.send(record)` in a separate
thread under a timeout, which isn't as nice.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]