Hi Yanquan, What’s the most effective approach to implement micro batching (i.e., grouping a set of events into a time window) for writing to a single folder at the destination while enhancing fault tolerance? Should this be done using windowing and triggers, or through checkpointing process time?
Psuedo Window + trigger implementation: https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/App.java#L97 https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/CustomStatefulSinkWriter.java#L109 Checkpointing process time: https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/CustomStatefulSinkWriter.java#L173 I see file sink using process time to commit the pending in-progress files. Please note that my implementation is still incomplete and trying to add one step at a time. Any feedback is appreciated. Thanks. On Mon, Oct 14, 2024 at 8:30 PM Yanquan Lv <decq12y...@gmail.com> wrote: > Sorry, I couldn't find any clear and detailed user guidance other than > FLIP in the official documentation too. > > > 2024年10月15日 01:39,Anil Dasari <adas...@guidewire.com> 写道: > > Hi Yanquan, > I've finished reading the Sink FLIPs and am now reviewing some of the sink > implementations, like TestSinkV2, to better understand the flow. I'll write > a new one to experiment with. > Are there flink sink docs/flow daigrams like detailed source > implementation docs like > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/ > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/> > ? > > Thanks. > > On Mon, Oct 14, 2024 at 10:18 AM Yanquan Lv <decq12y...@gmail.com> wrote: > >> Hi, Anil. >> For your scenario, I think looking at FLIP-143 first and then FLIP-191 >> should provide a better understanding. Then you can look at other FLIPs or >> specific implementations. >> >> Anil Dasari <adas...@guidewire.com> 于2024年10月15日周二 00:55写道: >> >>> Got it. thanks. >>> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177 >>> and 191. So, Is there a sequence of steps flow charts for better >>> understanding of the sink process with sink, writer and committer ? >>> >>> Thanks >>> >>> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com> wrote: >>> >>>> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will >>>> be replaced by SupportsCommitter[1] interface, which was introduced in >>>> Flink 1.19. >>>> But you can still use TwoPhaseCommittingSink under Flink 2.0, it >>>> depends on your target Flink version, The interfaces of these two APIs are >>>> almost identical. >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html >>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html> >>>> >>>> Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道: >>>> >>>>> Hi Yanquan, >>>>> Thanks for sharing the information. >>>>> It appears that TwoPhaseCommittingSink is not available in the flink >>>>> repo main branch. it is replaced with Sink, Committer and SinkWritter ? >>>>> >>>>> Thanks >>>>> >>>>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, Anil. >>>>>> >>>>>> Iceberg Sink is merged recently in >>>>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880 >>>>>> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880> >>>>>> . >>>>>> >>>>>> From your description, I guess that what you need is >>>>>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with >>>>>> the >>>>>> following steps: >>>>>> >>>>>> > 1. Group data by category and write it to S3 under its respective >>>>>> prefix. >>>>>> This can be done in >>>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write >>>>>> method. >>>>>> >>>>>> > 2. Update category metrics in a manifest file stored in the S3 >>>>>> manifest prefix. >>>>>> > 3. Send category metrics to an external system to initiate >>>>>> consumption. >>>>>> These metrics information could be passed by >>>>>> >>>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit >>>>>> method. >>>>>> And `Update category metrics`/`Send category metrics` can be done in >>>>>> org.apache.flink.api.connector.sink2.Committer#commit method. >>>>>> >>>>>> Rollback action could be done in SinkWriter or Committer, to delete >>>>>> files from S3, you need to pass the files information though >>>>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception >>>>>> to >>>>>> let Flink job failover and retry. >>>>>> >>>>>> >>>>>> [1] >>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html >>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html> >>>>>> >>>>>> >>>>>> >>>>>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道: >>>>>> >>>>>> Hello, >>>>>> >>>>>> I am looking to implement a Flink sink for the following use case, >>>>>> where the steps below are executed for each microbatch (using Spark >>>>>> terminology) or trigger: >>>>>> >>>>>> 1. Group data by category and write it to S3 under its respective >>>>>> prefix. >>>>>> 2. Update category metrics in a manifest file stored in the S3 >>>>>> manifest prefix. >>>>>> 3. Send category metrics to an external system to initiate >>>>>> consumption. >>>>>> >>>>>> In case of failure, any previously completed steps should be rolled >>>>>> back, i.e., delete files from S3 and reprocess the entire microbatch. >>>>>> >>>>>> It seems this pattern can be implemented using the Unified Sink API, >>>>>> as discussed in this video: >>>>>> https://www.youtube.com/watch?v=0GVI25OEs4A >>>>>> <https://www.youtube.com/watch?v=0GVI25OEs4A> >>>>>> . >>>>>> >>>>>> I'm currently reviewing FileSink and IcebergSink (still searching >>>>>> for the source code) to understand their implementations and create a new >>>>>> one. >>>>>> >>>>>> Are there any step-by-step docs or examples available for writing a >>>>>> new unified sink? >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> >>>>>> >