Hello Yun, Thanks for the quick response. This is really helpful.
I have confirmed with Oracle Streaming Service (OSS) that they currently don’t support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest to add some deduplicate mechanisms at Sink to mitigate the issue. Question 1: So the scenario should looks like this: When the flink application restarts after it fails, it will start from this checkpoint offset. The messages has been processed after the checkpoint before the failure will be processed twice here after the restart. Is there any chance of data corruption here, for example, breaking the window and sending out incomplete records? I am using some session windows based on DataStream event time timers. Question 2: For the KafkaSource<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-source>, I noticed that we don’t have a place to configure the semantic? Maybe enabling the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once semantic here? Please correct me if I am wrong here. Question 3: To guarantee the end-to-end exactly once, I think we must make sure the sink is exactly once, right? Since OSS has such limitation, is it possible to achieve effective EXACTLY_ONCE semantic through additional logic at Flink side since I can’t do too much on OSS side? Or it is technically impossible? If possible, I think I should implement the Sink<https://github.com/apache/flink/blob/release-1.14/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java> you mentioned. Thank you very much for the help! Fuyao From: Yun Gao <yungao...@aliyun.com> Date: Wednesday, February 9, 2022 at 23:17 To: Fuyao Li <fuyao...@oracle.com>, user <user@flink.apache.org> Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink? Hi Fuyao, Logically if a system want to support end-to-end exactly once, it should support transactions: 1. The transactions hold the records, it get pre-committed on snapshot state and get committed on checkpont succeed. 2. The transaction should still be able to be aborted after pre-committed. 3. Once pre-committed, the transactions must be able to be committed, even if the flink jobs fails between pre-committed and committed, after the job restarted these transaction should be able to be committed again. If the external system meet such conditions, to implement an exactly-once sink, the option b) should be more recommend. However, these interface is newly added in the upcoming 1.15 and it might need to be wait for about 1.5 month before releasing. An early version for option b is the org.apache.flink.api.connector.sink.Sink. It is much similar to the option b) and are supported since 1.13. It would still be supported in the next several releases and it also be able to be migrated to the option b) easily. Best, Yun ------------------Original Mail ------------------ Sender:Fuyao Li <fuyao...@oracle.com> Send Date:Thu Feb 10 07:01:51 2022 Recipients:user <user@flink.apache.org> Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink? Hello Community, I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic. 1. I have a SDK that could publish messages based on HTTP (backed by Oracle Streaming Service --- very similar to Kafka). This will be my Flink application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE semantic? HTTP is stateless here… If possible, what could be added in SDK to support EXACTLY_ONCE? 2. If it is possible for question 1, then I need to implement a custom sink for this. Which option should I use? * Option 1:TwoPhaseCommitSinkFunction<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjIAX36vY$> * Option 2:StatefulSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj5cKfxA0$> + TwoPhaseCommittingSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjA088CIg$> The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be removed from Flink in the future. The newKafkaSink<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-sink__;Iw!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj02BG7zk$> seems to be using option (b). Based on the comment in the code, it seems option (a) is recommended, which one should I use? Please suggest if I am missing anything, or any other better solutions in my case? Thanks, Fuyao