Hi, Anil. For your scenario, I think looking at FLIP-143 first and then FLIP-191 should provide a better understanding. Then you can look at other FLIPs or specific implementations.
Anil Dasari <adas...@guidewire.com> 于2024年10月15日周二 00:55写道: > Got it. thanks. > Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177 > and 191. So, Is there a sequence of steps flow charts for better > understanding of the sink process with sink, writer and committer ? > > Thanks > > On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com> wrote: > >> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be >> replaced by SupportsCommitter[1] interface, which was introduced in Flink >> 1.19. >> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends >> on your target Flink version, The interfaces of these two APIs are almost >> identical. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html >> >> Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道: >> >>> Hi Yanquan, >>> Thanks for sharing the information. >>> It appears that TwoPhaseCommittingSink is not available in the flink >>> repo main branch. it is replaced with Sink, Committer and SinkWritter ? >>> >>> Thanks >>> >>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote: >>> >>>> Hi, Anil. >>>> >>>> Iceberg Sink is merged recently in >>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880 >>>> . >>>> >>>> From your description, I guess that what you need is >>>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the >>>> following steps: >>>> >>>> > 1. Group data by category and write it to S3 under its respective >>>> prefix. >>>> This can be done in >>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write >>>> method. >>>> >>>> > 2. Update category metrics in a manifest file stored in the S3 >>>> manifest prefix. >>>> > 3. Send category metrics to an external system to initiate >>>> consumption. >>>> These metrics information could be passed by >>>> >>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit >>>> method. >>>> And `Update category metrics`/`Send category metrics` can be done in >>>> org.apache.flink.api.connector.sink2.Committer#commit method. >>>> >>>> Rollback action could be done in SinkWriter or Committer, to delete >>>> files from S3, you need to pass the files information though >>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to >>>> let Flink job failover and retry. >>>> >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html >>>> >>>> >>>> >>>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道: >>>> >>>> Hello, >>>> >>>> I am looking to implement a Flink sink for the following use case, >>>> where the steps below are executed for each microbatch (using Spark >>>> terminology) or trigger: >>>> >>>> 1. Group data by category and write it to S3 under its respective >>>> prefix. >>>> 2. Update category metrics in a manifest file stored in the S3 >>>> manifest prefix. >>>> 3. Send category metrics to an external system to initiate >>>> consumption. >>>> >>>> In case of failure, any previously completed steps should be rolled >>>> back, i.e., delete files from S3 and reprocess the entire microbatch. >>>> >>>> It seems this pattern can be implemented using the Unified Sink API, as >>>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A. >>>> >>>> I'm currently reviewing FileSink and IcebergSink (still searching for >>>> the source code) to understand their implementations and create a new one. >>>> >>>> Are there any step-by-step docs or examples available for writing a new >>>> unified sink? >>>> >>>> Thanks >>>> >>>> >>>> >>>>