Hi, ## Questions
1. Any possible way to make sure to avoid batch split, or oversized batches? 2. Any progress/discussion to fix the issue mentioned in the following PR: https://github.com/apache/kafka/pull/6469 (kafka#6469) ## Background `FlinkKafkaProducer` expects that callbacks for sent records will be executed after `KafkaProducer#flush`, while executing snapshot. But, it does not hold when batch split happens, as explained in PR kafka#6469. We are observing IllegalStateException for this issue, with Flink deployment. https://github.com/apache/flink/blob/release-1.9.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L960 We are looking for a workaround and a resolution for this issue. Possible workaround/resolution ideas: * Avoid batch split * Make Flink wait for inflight requests to be completed somehow * Make `KafkaProducer#flush` block until inflight requests are completed, even when batch split and reenqueue happens Thanks, Tomoyuki