Hi Yanquan, Thanks for sharing the information. It appears that TwoPhaseCommittingSink is not available in the flink repo main branch. it is replaced with Sink, Committer and SinkWritter ?
Thanks On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote: > Hi, Anil. > > Iceberg Sink is merged recently in > https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880 > <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880> > . > > From your description, I guess that what you need is > a TwoPhaseCommittingSink[1], the steps you listed can be executed with the > following steps: > > > 1. Group data by category and write it to S3 under its respective prefix. > This can be done in > org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write > method. > > > 2. Update category metrics in a manifest file stored in the S3 manifest > prefix. > > 3. Send category metrics to an external system to initiate consumption. > These metrics information could be passed by > > org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit > method. > And `Update category metrics`/`Send category metrics` can be done in > org.apache.flink.api.connector.sink2.Committer#commit method. > > Rollback action could be done in SinkWriter or Committer, to delete files > from S3, you need to pass the files information though > PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to > let Flink job failover and retry. > > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html > <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html> > > > > 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道: > > Hello, > > I am looking to implement a Flink sink for the following use case, where > the steps below are executed for each microbatch (using Spark terminology) > or trigger: > > 1. Group data by category and write it to S3 under its respective > prefix. > 2. Update category metrics in a manifest file stored in the S3 > manifest prefix. > 3. Send category metrics to an external system to initiate consumption. > > In case of failure, any previously completed steps should be rolled back, > i.e., delete files from S3 and reprocess the entire microbatch. > > It seems this pattern can be implemented using the Unified Sink API, as > discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A > <https://www.youtube.com/watch?v=0GVI25OEs4A> > . > > I'm currently reviewing FileSink and IcebergSink (still searching for the > source code) to understand their implementations and create a new one. > > Are there any step-by-step docs or examples available for writing a new > unified sink? > > Thanks > > > >