Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be replaced by SupportsCommitter[1] interface, which was introduced in Flink 1.19. But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends on your target Flink version, The interfaces of these two APIs are almost identical.
[1] https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道: > Hi Yanquan, > Thanks for sharing the information. > It appears that TwoPhaseCommittingSink is not available in the flink repo > main branch. it is replaced with Sink, Committer and SinkWritter ? > > Thanks > > On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote: > >> Hi, Anil. >> >> Iceberg Sink is merged recently in >> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880 >> . >> >> From your description, I guess that what you need is >> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the >> following steps: >> >> > 1. Group data by category and write it to S3 under its respective >> prefix. >> This can be done in >> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write >> method. >> >> > 2. Update category metrics in a manifest file stored in the S3 manifest >> prefix. >> > 3. Send category metrics to an external system to initiate consumption. >> These metrics information could be passed by >> >> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit >> method. >> And `Update category metrics`/`Send category metrics` can be done in >> org.apache.flink.api.connector.sink2.Committer#commit method. >> >> Rollback action could be done in SinkWriter or Committer, to delete files >> from S3, you need to pass the files information though >> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to >> let Flink job failover and retry. >> >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html >> >> >> >> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道: >> >> Hello, >> >> I am looking to implement a Flink sink for the following use case, where >> the steps below are executed for each microbatch (using Spark terminology) >> or trigger: >> >> 1. Group data by category and write it to S3 under its respective >> prefix. >> 2. Update category metrics in a manifest file stored in the S3 >> manifest prefix. >> 3. Send category metrics to an external system to initiate >> consumption. >> >> In case of failure, any previously completed steps should be rolled back, >> i.e., delete files from S3 and reprocess the entire microbatch. >> >> It seems this pattern can be implemented using the Unified Sink API, as >> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A. >> >> I'm currently reviewing FileSink and IcebergSink (still searching for >> the source code) to understand their implementations and create a new one. >> >> Are there any step-by-step docs or examples available for writing a new >> unified sink? >> >> Thanks >> >> >> >>