[ https://issues.apache.org/jira/browse/FLINK-35749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862701#comment-17862701 ]
Jimmy Zhao commented on FLINK-35749: ------------------------------------ Thanks [~fpaul] . The root cause is in KafkaWriter: WriterCallback#onCompletion is trying to expose the Kafka Exception to KafkaWriter, but the global Exception will possibly be set to null by MailboxExecutor thread, as a result, the write and flush function in KafkaWriter cannot get the exception and continue to send kafka message. And about your questions: * How did you configure your Flink job e.g. checkpointing, boundedness? A: open checkpoint with interval 20 senconds, and is boundness * Which Flink version do you use? A: i use flink 1.16.2 * Are you using the KafkaSink with exactly once semantics? A: no, i use it with At LEAST ONCE semantics * How do you count the messages in the output topic, e.g., only committed records? A: i use another consumer to pull all the messages and write them to a mongodb table .Also I can see the count on kafka admin. In my case, I didn't use the kafka transaction and EXACTLY ONCE, but this problem will also influence EXACTLY ONCE, i think > Kafka sink component will lose data when kafka cluster is unavailable for a > while > --------------------------------------------------------------------------------- > > Key: FLINK-35749 > URL: https://issues.apache.org/jira/browse/FLINK-35749 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.16.2, 1.17.1, 1.20.0 > Reporter: Jimmy Zhao > Assignee: Jimmy Zhao > Priority: Blocker > > As the title described, here is the procedure to reproduce the problem: > 1. develop a simple flink stream job to consume from one kafka topic and sink > to anthoer kafka sever and topic > 2. make amount of kafka message and produce to the source kafka topic, record > the message number > 3. start the flink stream job, and config to cosume from earliest source > topic offset > 4. during the job cosuming the source topic, restart the kafka cluster(we use > aws MSK) > 5. the flink job will not throw any Exception like nothing happened, but only > print error log like : [kafka-producer-network-thread | producer-2] INFO > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] Node > 2 disconnected. > 6. wait for the kafka cluster finished restarting and all the source kafka > message consumed > 7. count the target kafka topic message number, compare to the source, there > is a high probability of data loss(more than 50%) -- This message was sent by Atlassian Jira (v8.20.10#820010)