[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15344654#comment-15344654 ]
ASF GitHub Bot commented on FLINK-4027: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r68085762 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -113,6 +123,14 @@ /** Errors encountered in the async producer are stored here */ protected transient volatile Exception asyncException; + /** + * Number of unacknowledged records. + * There is no need to introduce additional locks because invoke() and snapshotState() are + * never called concurrently. So blocking the snapshotting will lock the invoke() method until all + * pending records have been confirmed. + */ --- End diff -- I think the fact that `invoke` and `snapshotState` are mutually exclusive is not important for the semantics of the `pendingRecords` variable. The reason is that it will only be incremented in `invoke` and decremented in the `callbacks` of the Kafka producer. > FlinkKafkaProducer09 sink can lose messages > ------------------------------------------- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.0.3 > Reporter: Elias Levy > Assignee: Robert Metzger > Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)