Got it. thanks. Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177 and 191. So, Is there a sequence of steps flow charts for better understanding of the sink process with sink, writer and committer ?
Thanks On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com> wrote: > Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be > replaced by SupportsCommitter[1] interface, which was introduced in Flink > 1.19. > But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends > on your target Flink version, The interfaces of these two APIs are almost > identical. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html > <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html> > > Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道: > >> Hi Yanquan, >> Thanks for sharing the information. >> It appears that TwoPhaseCommittingSink is not available in the flink repo >> main branch. it is replaced with Sink, Committer and SinkWritter ? >> >> Thanks >> >> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote: >> >>> Hi, Anil. >>> >>> Iceberg Sink is merged recently in >>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880 >>> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880> >>> . >>> >>> From your description, I guess that what you need is >>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the >>> following steps: >>> >>> > 1. Group data by category and write it to S3 under its respective >>> prefix. >>> This can be done in >>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write >>> method. >>> >>> > 2. Update category metrics in a manifest file stored in the S3 >>> manifest prefix. >>> > 3. Send category metrics to an external system to initiate consumption. >>> These metrics information could be passed by >>> >>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit >>> method. >>> And `Update category metrics`/`Send category metrics` can be done in >>> org.apache.flink.api.connector.sink2.Committer#commit method. >>> >>> Rollback action could be done in SinkWriter or Committer, to delete >>> files from S3, you need to pass the files information though >>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to >>> let Flink job failover and retry. >>> >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html >>> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html> >>> >>> >>> >>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道: >>> >>> Hello, >>> >>> I am looking to implement a Flink sink for the following use case, where >>> the steps below are executed for each microbatch (using Spark terminology) >>> or trigger: >>> >>> 1. Group data by category and write it to S3 under its respective >>> prefix. >>> 2. Update category metrics in a manifest file stored in the S3 >>> manifest prefix. >>> 3. Send category metrics to an external system to initiate >>> consumption. >>> >>> In case of failure, any previously completed steps should be rolled >>> back, i.e., delete files from S3 and reprocess the entire microbatch. >>> >>> It seems this pattern can be implemented using the Unified Sink API, as >>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A >>> <https://www.youtube.com/watch?v=0GVI25OEs4A> >>> . >>> >>> I'm currently reviewing FileSink and IcebergSink (still searching for >>> the source code) to understand their implementations and create a new one. >>> >>> Are there any step-by-step docs or examples available for writing a new >>> unified sink? >>> >>> Thanks >>> >>> >>> >>>