I am correct in assuming that the Kafka producer sink can lose message? I don't expect exactly-once semantics using Kafka as a sink given Kafka publishing guarantees, but I do expect at least once.
I gather from reading the source that the producer is publishing messages asynchronously, as expected. And that a callback can record publishing errors, which will be raised when detected. But as far as I can tell, there is no barrier to wait for async errors from the sink when checkpointing. Did I miss it? To do so you'd have to call the flush() method in KafkaProducer, which would flush and block until all penny requests succeeded or failed. Given that FlinkKafkaProducer09 is just a SinkFuntion, with a simple invoke() method, there doesn't appear to be a way to ensure the sink has published all pending writes successfully. To it seems like if a checkpoint occurs while there are pending publish requests, and the requests return a failure after the checkpoint occurred, those message will be lost as the checkpoint will consider them processed by the sink. Seems as if there is an expectation that SinkFunction is synchronous. Maybe there is a need for a AsyncSinkFunction interface with a method to block until messages are flushed, or the Sink should keep track what messages have been successfully published so that the information can be used by the checkpointing system.