AHeise commented on code in PR #107: URL: https://github.com/apache/flink-connector-kafka/pull/107#discussion_r1669917010
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java: ########## @@ -411,11 +411,22 @@ private void registerMetricSync() { * Logic needs to be invoked by write AND flush since we support various semantics. */ private void checkAsyncException() throws IOException { - // reset this exception since we could close the writer later on Exception e = asyncProducerException; if (e != null) { - - asyncProducerException = null; + // In old version, asyncProducerException will be set to null here, which causes another + // bug [FLINK-35749] + // once asyncProducerException is set to null here, the MailboxExecutor thread in + // WriterCallback.onCompletion may also invoke this method checkAsyncException and get + // chance to make + // asyncProducerException be null before the function flush() and write() seeing the + // exception. + // After that, the flush and writer functions can continue to write next message batch + // to kafka, the old + // in-flight message batch with exceptions get lost. + // We hope that once some exceptions thrown during sending kafka, KafkaWriter could be + // fast-failed and + // trigger global restarting, then asyncProducerException will be initialized as null + // again. Review Comment: It's a good point to raise if we need to reset at all or not. I'm assuming @mas-chen did it to emulate the previous behavior. From what I can see, the only real downside is that we rethrow the exception in `close` (which is always called also in case of errors). That could prevent some proper cleanup and will count exceptions twice in most cases, which will inflate the metrics. I think the latter point has quite a bit of side-effects: I'm expecting most customers to have some alerts on the exceptions metrics, which could mean that it's a behavior change that would alert more users than in previous versions. So in the worst case, some poor data engineer will be paged in the middle of the night because of that. And I would expect a few alerts to be changed to be less noisy. All in all, I think that resetting the exception would be good IF we are all sure that there won't be any exception missed. Being sure that we can't have corrupted state is of course more important than having accurate metrics but ideally we have both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org