[ https://issues.apache.org/jira/browse/FLINK-35749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863147#comment-17863147 ]
Jimmy Zhao commented on FLINK-35749: ------------------------------------ [~martijnvisser] Yes, I reproduced this bug following the procedure in description with the latest stable release 1.19, and use flink-connector-kafka version 3.2.0-1.19. And I found in my case, i've set the producer config retries=0, as i want to make sure all the messages are ordered once Exception happened, which make this bug happened very easy. This may be the reason that most of KafkaSink users have not met the problem before, since they set the retries param as default(2147483647), But I think we still need to fix it, since guys like me will set the retries=0 or less than 5 for some reason. And except the case of restarting kafka cluster, other kafka Exceptions can not be resolved by retrying some times, will still make the message lost. For example: the Exception caused by a message body exceeding the limit of message.max.bytes, these Exceptions may be swallowed by mailboxExecutor.submit > Kafka sink component will lose data when kafka cluster is unavailable for a > while > --------------------------------------------------------------------------------- > > Key: FLINK-35749 > URL: https://issues.apache.org/jira/browse/FLINK-35749 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.16.2, 1.17.1, 1.20.0 > Reporter: Jimmy Zhao > Assignee: Jimmy Zhao > Priority: Blocker > > As the title described, here is the procedure to reproduce the problem: > 1. develop a simple flink stream job to consume from one kafka topic and sink > to anthoer kafka sever and topic > 2. make amount of kafka message and produce to the source kafka topic, record > the message number > 3. start the flink stream job, and config to cosume from earliest source > topic offset > 4. during the job cosuming the source topic, restart the kafka cluster(we use > aws MSK) > 5. the flink job will not throw any Exception like nothing happened, but only > print error log like : [kafka-producer-network-thread | producer-2] INFO > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] Node > 2 disconnected. > 6. wait for the kafka cluster finished restarting and all the source kafka > message consumed > 7. count the target kafka topic message number, compare to the source, there > is a high probability of data loss(more than 50%) -- This message was sent by Atlassian Jira (v8.20.10#820010)