Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job 
restarted
these transaction should be able to be committed again. 

If the external system meet such conditions, to implement an exactly-once sink, 
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before 
releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. 
It is much 
similar to the option b) and are supported since 1.13. It would still be 
supported in the
next several releases and  it also be able to be migrated to the option b) 
easily. 

Best,
Yun



 ------------------Original Mail ------------------
Sender:Fuyao Li <fuyao...@oracle.com>
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user <user@flink.apache.org>
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink 
to achieve EXACTLY_ONCE sink?

Hello Community,
I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.

I have a SDK that could publish messages based on HTTP (backed by Oracle 
Streaming Service --- very similar to Kafka).  This will be my Flink 
application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE 
semantic? HTTP is stateless here… If possible, what could be added in SDK to 
support EXACTLY_ONCE?
If it is possible for question 1, then I need to implement a custom sink for 
this. Which option should I use?
Option 1:TwoPhaseCommitSinkFunction
Option 2:StatefulSink + TwoPhaseCommittingSink
The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be 
removed from Flink in the future. The newKafkaSink seems to be using option 
(b). Based on the comment in the code, it seems option (a) is recommended, 
which one should I use? Please suggest if I am missing anything, or any other 
better solutions in my case?
Thanks,
Fuyao

Reply via email to