[ 
https://issues.apache.org/jira/browse/FLINK-35749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862701#comment-17862701
 ] 

Jimmy Zhao commented on FLINK-35749:
------------------------------------

Thanks [~fpaul] . The root cause is in KafkaWriter: WriterCallback#onCompletion 
is trying to expose the Kafka Exception to KafkaWriter, but the global 
Exception will possibly be set to null by MailboxExecutor thread, as a result, 
the write and flush function in KafkaWriter cannot get the exception and 
continue to send kafka message.

And about your questions:
 * How did you configure your Flink job e.g. checkpointing, boundedness?
A: open checkpoint with interval 20 senconds, and is boundness
 * Which Flink version do you use?
A: i use flink 1.16.2
 * Are you using the KafkaSink with exactly once semantics?
A: no, i use it with At LEAST ONCE semantics
 * How do you count the messages in the output topic, e.g., only committed 
records?
A: i use another consumer to  pull all the messages and write them to a mongodb 
table .Also I can see the count on kafka admin. 


In my case, I didn't use the kafka transaction and EXACTLY ONCE, but this 
problem will also influence EXACTLY ONCE, i think

> Kafka sink component will lose data when kafka cluster is unavailable for a 
> while
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-35749
>                 URL: https://issues.apache.org/jira/browse/FLINK-35749
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.2, 1.17.1, 1.20.0
>            Reporter: Jimmy Zhao
>            Assignee: Jimmy Zhao
>            Priority: Blocker
>
> As the title described, here is the procedure to reproduce the problem:
> 1. develop a simple flink stream job to consume from one kafka topic and sink 
> to anthoer kafka sever and topic
> 2. make amount of kafka message and produce to the source kafka topic, record 
> the message number
> 3. start the flink stream job, and config to cosume from earliest source 
> topic offset
> 4. during the job cosuming the source topic, restart the kafka cluster(we use 
> aws MSK)
> 5. the flink job will not throw any Exception like nothing happened, but only 
> print error log like : [kafka-producer-network-thread | producer-2] INFO  
> org.apache.kafka.clients.NetworkClient  - [Producer clientId=producer-2] Node 
> 2 disconnected.
> 6. wait for the kafka cluster finished restarting and all the source kafka 
> message consumed
> 7. count the target kafka topic message number, compare to the source, there 
> is a high probability of data loss(more than 50%)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to