AHeise commented on code in PR #107:
URL: 
https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1666693262


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -449,12 +460,17 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
                 }
 
                 // Checking for exceptions from previous writes
-                mailboxExecutor.submit(
+                // Notice: throwing exception in mailboxExecutor thread is not 
safe enough for
+                // triggering global
+                // fail over, which has been fixed in [FLINK-31305]. And using
+                // mailboxExecutor.execute() is better than
+                // mailboxExecutor.submit() since submit will swallow 
exceptions inside.
+                mailboxExecutor.execute(

Review Comment:
   Yes, this is the actual fix. Using execute instead of submit. This bug got 
introduced through FLINK-31305.
   
   Note: `submit` does not swallow exceptions. It's an anti-pattern to use 
`submit` without looking at the Future. The future holds the result/exception 
to be handled async somewhere. If we bubble up the exception the task dies 
without the call-site being able to properly react to it. 
   So please remove the second part of your comment from "And using ...".
   
   I will amend the javadoc of the mailbox executor to make it clear that the 
executor indeed does not bubble up exception in `submit`.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -411,11 +411,22 @@ private void registerMetricSync() {
      * Logic needs to be invoked by write AND flush since we support various 
semantics.
      */
     private void checkAsyncException() throws IOException {
-        // reset this exception since we could close the writer later on
         Exception e = asyncProducerException;
         if (e != null) {
-
-            asyncProducerException = null;
+            // In old version, asyncProducerException will be set to null 
here, which causes another
+            // bug [FLINK-35749]
+            // once asyncProducerException is set to null here, the 
MailboxExecutor thread in
+            // WriterCallback.onCompletion may also invoke this method 
checkAsyncException and get
+            // chance to make
+            // asyncProducerException be null before the function flush() and 
write() seeing the
+            // exception.
+            // After that, the flush and writer functions can continue to 
write next message batch
+            // to kafka, the old
+            // in-flight message batch with exceptions get lost.
+            // We hope that once some exceptions thrown during sending kafka, 
KafkaWriter could be
+            // fast-failed and
+            // trigger global restarting, then asyncProducerException will be 
initialized as null
+            // again.

Review Comment:
   I don't think this change is necessary at all. You can see that we now have 
inflated error count metrics in all the tests that you adjusted (and should be 
reverted as well).
   
   The issue is not that we reset the exception. It's that it doesn't bubble up 
(see below). We actually have a clear contract on the volatile 
`asyncProducerException`: it's set in the `onCompletion` and read+reset in the 
main thread (which is the mailbox thread). So we never have a chance to 
concurrently reset the exception and lose it somehow.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to