Hi Yanquan, I've finished reading the Sink FLIPs and am now reviewing some of the sink implementations, like TestSinkV2, to better understand the flow. I'll write a new one to experiment with. Are there flink sink docs/flow daigrams like detailed source implementation docs like https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/ ?
Thanks. On Mon, Oct 14, 2024 at 10:18 AM Yanquan Lv <decq12y...@gmail.com> wrote: > Hi, Anil. > For your scenario, I think looking at FLIP-143 first and then FLIP-191 > should provide a better understanding. Then you can look at other FLIPs or > specific implementations. > > Anil Dasari <adas...@guidewire.com> 于2024年10月15日周二 00:55写道: > >> Got it. thanks. >> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177 >> and 191. So, Is there a sequence of steps flow charts for better >> understanding of the sink process with sink, writer and committer ? >> >> Thanks >> >> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com> wrote: >> >>> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will >>> be replaced by SupportsCommitter[1] interface, which was introduced in >>> Flink 1.19. >>> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends >>> on your target Flink version, The interfaces of these two APIs are almost >>> identical. >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html >>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html> >>> >>> Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道: >>> >>>> Hi Yanquan, >>>> Thanks for sharing the information. >>>> It appears that TwoPhaseCommittingSink is not available in the flink >>>> repo main branch. it is replaced with Sink, Committer and SinkWritter ? >>>> >>>> Thanks >>>> >>>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> >>>> wrote: >>>> >>>>> Hi, Anil. >>>>> >>>>> Iceberg Sink is merged recently in >>>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880 >>>>> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880> >>>>> . >>>>> >>>>> From your description, I guess that what you need is >>>>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the >>>>> following steps: >>>>> >>>>> > 1. Group data by category and write it to S3 under its respective >>>>> prefix. >>>>> This can be done in >>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write >>>>> method. >>>>> >>>>> > 2. Update category metrics in a manifest file stored in the S3 >>>>> manifest prefix. >>>>> > 3. Send category metrics to an external system to initiate >>>>> consumption. >>>>> These metrics information could be passed by >>>>> >>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit >>>>> method. >>>>> And `Update category metrics`/`Send category metrics` can be done in >>>>> org.apache.flink.api.connector.sink2.Committer#commit method. >>>>> >>>>> Rollback action could be done in SinkWriter or Committer, to delete >>>>> files from S3, you need to pass the files information though >>>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to >>>>> let Flink job failover and retry. >>>>> >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html >>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html> >>>>> >>>>> >>>>> >>>>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道: >>>>> >>>>> Hello, >>>>> >>>>> I am looking to implement a Flink sink for the following use case, >>>>> where the steps below are executed for each microbatch (using Spark >>>>> terminology) or trigger: >>>>> >>>>> 1. Group data by category and write it to S3 under its respective >>>>> prefix. >>>>> 2. Update category metrics in a manifest file stored in the S3 >>>>> manifest prefix. >>>>> 3. Send category metrics to an external system to initiate >>>>> consumption. >>>>> >>>>> In case of failure, any previously completed steps should be rolled >>>>> back, i.e., delete files from S3 and reprocess the entire microbatch. >>>>> >>>>> It seems this pattern can be implemented using the Unified Sink API, >>>>> as discussed in this video: >>>>> https://www.youtube.com/watch?v=0GVI25OEs4A >>>>> <https://www.youtube.com/watch?v=0GVI25OEs4A> >>>>> . >>>>> >>>>> I'm currently reviewing FileSink and IcebergSink (still searching for >>>>> the source code) to understand their implementations and create a new one. >>>>> >>>>> Are there any step-by-step docs or examples available for writing a >>>>> new unified sink? >>>>> >>>>> Thanks >>>>> >>>>> >>>>> >>>>>