AHeise commented on code in PR #107: URL: https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1666693262
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java: ########## @@ -449,12 +460,17 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } // Checking for exceptions from previous writes - mailboxExecutor.submit( + // Notice: throwing exception in mailboxExecutor thread is not safe enough for + // triggering global + // fail over, which has been fixed in [FLINK-31305]. And using + // mailboxExecutor.execute() is better than + // mailboxExecutor.submit() since submit will swallow exceptions inside. + mailboxExecutor.execute( Review Comment: Yes, this is the actual fix. Using execute instead of submit. This bug got introduced through FLINK-31305. Note: `submit` does not swallow exceptions. It's an anti-pattern to use `submit` without looking at the Future. The future holds the result/exception to be handled async somewhere. If we bubble up the exception the task dies without the call-site being able to properly react to it. So please remove the second part of your comment from "And using ...". I will amend the javadoc of the mailbox executor to make it clear that the executor indeed does not bubble up exception in `submit`. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java: ########## @@ -411,11 +411,22 @@ private void registerMetricSync() { * Logic needs to be invoked by write AND flush since we support various semantics. */ private void checkAsyncException() throws IOException { - // reset this exception since we could close the writer later on Exception e = asyncProducerException; if (e != null) { - - asyncProducerException = null; + // In old version, asyncProducerException will be set to null here, which causes another + // bug [FLINK-35749] + // once asyncProducerException is set to null here, the MailboxExecutor thread in + // WriterCallback.onCompletion may also invoke this method checkAsyncException and get + // chance to make + // asyncProducerException be null before the function flush() and write() seeing the + // exception. + // After that, the flush and writer functions can continue to write next message batch + // to kafka, the old + // in-flight message batch with exceptions get lost. + // We hope that once some exceptions thrown during sending kafka, KafkaWriter could be + // fast-failed and + // trigger global restarting, then asyncProducerException will be initialized as null + // again. Review Comment: I don't think this change is necessary at all. You can see that we now have inflated error count metrics in all the tests that you adjusted (and should be reverted as well). The issue is not that we reset the exception. It's that it doesn't bubble up (see below). We actually have a clear contract on the volatile `asyncProducerException`: it's set in the `onCompletion` and read+reset in the main thread (which is the mailbox thread). So we never have a chance to concurrently reset the exception and lose it somehow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org