[ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15927441#comment-15927441 ]
ASF GitHub Bot commented on FLINK-5701: --------------------------------------- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3549 Thanks for the review! Travis is green (local branch), with only test timeouts: https://travis-ci.org/tzulitai/flink Merging this .. > FlinkKafkaProducer should check asyncException on checkpoints > ------------------------------------------------------------- > > Key: FLINK-5701 > URL: https://issues.apache.org/jira/browse/FLINK-5701 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Critical > Fix For: 1.3.0, 1.1.5, 1.2.1 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html > The problem: > The producer holds a {{pendingRecords}} value that is incremented on each > invoke() and decremented on each callback, used to check if the producer > needs to sync on pending callbacks on checkpoints. > On each checkpoint, we should only consider the checkpoint succeeded iff > after flushing the {{pendingRecords == 0}} and {{asyncException == null}} > (currently, we’re only checking {{pendingRecords}}). > A quick fix for this is to check and rethrow async exceptions in the > {{snapshotState}} method both before and after flushing and > {{pendingRecords}} becomes 0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)