AHeise commented on code in PR #107:
URL: 
https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1669917010


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -411,11 +411,22 @@ private void registerMetricSync() {
      * Logic needs to be invoked by write AND flush since we support various 
semantics.
      */
     private void checkAsyncException() throws IOException {
-        // reset this exception since we could close the writer later on
         Exception e = asyncProducerException;
         if (e != null) {
-
-            asyncProducerException = null;
+            // In old version, asyncProducerException will be set to null 
here, which causes another
+            // bug [FLINK-35749]
+            // once asyncProducerException is set to null here, the 
MailboxExecutor thread in
+            // WriterCallback.onCompletion may also invoke this method 
checkAsyncException and get
+            // chance to make
+            // asyncProducerException be null before the function flush() and 
write() seeing the
+            // exception.
+            // After that, the flush and writer functions can continue to 
write next message batch
+            // to kafka, the old
+            // in-flight message batch with exceptions get lost.
+            // We hope that once some exceptions thrown during sending kafka, 
KafkaWriter could be
+            // fast-failed and
+            // trigger global restarting, then asyncProducerException will be 
initialized as null
+            // again.

Review Comment:
   It's a good point to raise if we need to reset at all or not. I'm assuming 
@mas-chen did it to emulate the previous behavior.
   
   From what I can see, the only real downside is that we rethrow the exception 
in `close` (which is always called also in case of errors). That could prevent 
some proper cleanup and will count exceptions twice in most cases, which will 
inflate the metrics. I think the latter point has quite a bit of side-effects:
   
   I'm expecting most customers to have some alerts on the exceptions metrics, 
which could mean that it's a behavior change that would alert more users than 
in previous versions. So in the worst case, some poor data engineer will be 
paged in the middle of the night because of that. And I would expect a few 
alerts to be changed to be less noisy.
   
   All in all, I think that resetting the exception would be good IF we are all 
sure that there won't be any exception missed. Being sure that we can't have 
corrupted state is of course more important than having accurate metrics but 
ideally we have both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to