JimmyZZZ commented on code in PR #107:
URL: 
https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1668387172


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -411,11 +411,22 @@ private void registerMetricSync() {
      * Logic needs to be invoked by write AND flush since we support various 
semantics.
      */
     private void checkAsyncException() throws IOException {
-        // reset this exception since we could close the writer later on
         Exception e = asyncProducerException;
         if (e != null) {
-
-            asyncProducerException = null;
+            // In old version, asyncProducerException will be set to null 
here, which causes another
+            // bug [FLINK-35749]
+            // once asyncProducerException is set to null here, the 
MailboxExecutor thread in
+            // WriterCallback.onCompletion may also invoke this method 
checkAsyncException and get
+            // chance to make
+            // asyncProducerException be null before the function flush() and 
write() seeing the
+            // exception.
+            // After that, the flush and writer functions can continue to 
write next message batch
+            // to kafka, the old
+            // in-flight message batch with exceptions get lost.
+            // We hope that once some exceptions thrown during sending kafka, 
KafkaWriter could be
+            // fast-failed and
+            // trigger global restarting, then asyncProducerException will be 
initialized as null
+            // again.

Review Comment:
   @AHeise  I've remove the modification before about the reset logic in 
checkAsyncException(), and i understand what you mean about "we never have a 
chance to concurrently reset the exception" and agree with you.
   
   Btw, I'm still little curious: if we don't reset the asyncProducerException 
to null in checkAsyncException(), what problems we will meet? As the comment 
from [FLINK-31305] : Propagate the first exception since amount of exceptions 
could be large.  Is this the only reason that we throw the Exception for only 
one time?  This seems not so persuasive for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to