Hello Tomoyuki, It seems that issue in 6494 is indeed valid, and I'd personally suggest we do option 3) to fix the flush() behavior. Please feel free to create a JIRA (and also submit your PR if you are interested in contributing :).
Guozhang On Sat, Dec 7, 2019 at 7:59 AM Tomoyuki Saito <aocch...@gmail.com> wrote: > Hi, > > ## Questions > > 1. Any possible way to make sure to avoid batch split, or oversized > batches? > 2. Any progress/discussion to fix the issue mentioned in the following > PR: https://github.com/apache/kafka/pull/6469 (kafka#6469) > > ## Background > > `FlinkKafkaProducer` expects that callbacks for sent records will be > executed after `KafkaProducer#flush`, while executing snapshot. But, > it does not hold when batch split happens, as explained in PR > kafka#6469. We are observing IllegalStateException for this issue, > with Flink deployment. > > https://github.com/apache/flink/blob/release-1.9.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L960 > > We are looking for a workaround and a resolution for this issue. > > Possible workaround/resolution ideas: > * Avoid batch split > * Make Flink wait for inflight requests to be completed somehow > * Make `KafkaProducer#flush` block until inflight requests are > completed, even when batch split and reenqueue happens > > Thanks, > Tomoyuki > -- -- Guozhang