JimmyZZZ commented on code in PR #107: URL: https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1670212145
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java: ########## @@ -411,11 +411,22 @@ private void registerMetricSync() { * Logic needs to be invoked by write AND flush since we support various semantics. */ private void checkAsyncException() throws IOException { - // reset this exception since we could close the writer later on Exception e = asyncProducerException; if (e != null) { - - asyncProducerException = null; + // In old version, asyncProducerException will be set to null here, which causes another + // bug [FLINK-35749] + // once asyncProducerException is set to null here, the MailboxExecutor thread in + // WriterCallback.onCompletion may also invoke this method checkAsyncException and get + // chance to make + // asyncProducerException be null before the function flush() and write() seeing the + // exception. + // After that, the flush and writer functions can continue to write next message batch + // to kafka, the old + // in-flight message batch with exceptions get lost. + // We hope that once some exceptions thrown during sending kafka, KafkaWriter could be + // fast-failed and + // trigger global restarting, then asyncProducerException will be initialized as null + // again. Review Comment: Very good consideration for "data engineer will be paged in the middle of the night" once the alerts on the exceptions metrics changes. Then I think we need to be very cautious for the following modification about the exception logic. And the new test KafkaWriterFaultToleranceITCase can play a role for this concerns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org