[ https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17786872#comment-17786872 ]
Tzu-Li (Gordon) Tai commented on FLINK-33293: --------------------------------------------- We have a new ticket whose description seems to break my understanding / assumption of how flushing has been working in the KafkaWriter: https://issues.apache.org/jira/browse/FLINK-33545 If what that ticket describes is indeed true, that would also answer the mystery here ... i.e. in at-least-once mode, flushing the producer doesn't actually flush everything + ensure all records are successfully written before the checkpoint barrier is processed > Data loss with Kafka Sink > ------------------------- > > Key: FLINK-33293 > URL: https://issues.apache.org/jira/browse/FLINK-33293 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.16.1, 1.16.2, 1.17.1 > Reporter: Jasmin Redzepovic > Priority: Major > Attachments: job.log, job_1_16_2_run1.log, job_1_16_2_run2.log, > job_1_17_1_run1.log, job_1_17_1_run2.log > > > More info in Slack discussion: > [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1] > > *TLDR:* > (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and > 1.17 I can reproduce it) > I have created a sink topic with 8 partitions, a replication factor of 3, and > a minimum in-sync replicas of 2. The consumer properties are set to their > default values. > For the producer, I made changes to the delivery.timeout.ms and > request.timeout.ms properties, setting them to *5000ms* and *4000ms* > respectively. (acks are set to -1 by default, which is equals to _all_ I > guess) > KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job > parallelism is set to 1 and the checkpointing interval is set to 2000ms. I > started a Flink Job and monitored its logs. Additionally, I was consuming the > __consumer_offsets topic in parallel to track when offsets are committed for > my consumer group. > The problematic part occurs during checkpoint 5. Its duration was 5009ms, > which exceeds the delivery timeout for Kafka (5000ms). Although it was marked > as completed, I believe that the output buffer of KafkaSink was not fully > acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 > but immediately encountered a Kafka {_}TimeoutException: Expiring N > records{_}. > I suspect that this exception originated from checkpoint 5 and that > checkpoint 5 should not have been considered successful. The job then failed > but recovered from checkpoint 5. Some time after checkpoint 7, consumer > offsets were committed to Kafka, and this process repeated once more at > checkpoint 9. > Since the offsets of checkpoint 5 were committed to Kafka, but the output > buffer was only partially delivered, there has been data loss. I confirmed > this when sinking the topic to the database. -- This message was sent by Atlassian Jira (v8.20.10#820010)