Hi Yanquan,
What’s the most effective approach to implement micro batching (i.e.,
grouping a set of events into a time window) for writing to a single folder
at the destination while enhancing fault tolerance? Should this be done
using windowing and triggers, or through checkpointing process time?

Psuedo Window + trigger implementation:
https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/App.java#L97
https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/CustomStatefulSinkWriter.java#L109

Checkpointing process time:
https://github.com/adasari/mastering-flink/blob/main/app/src/main/java/org/example/customsink/CustomStatefulSinkWriter.java#L173

I see file sink using process time to commit the pending in-progress files.

Please note that my implementation is still incomplete and trying to add
one step at a time. Any feedback is appreciated.

Thanks.

On Mon, Oct 14, 2024 at 8:30 PM Yanquan Lv <decq12y...@gmail.com> wrote:

> Sorry, I couldn't find any clear and detailed user guidance other than
> FLIP in the official documentation too.
>
>
> 2024年10月15日 01:39,Anil Dasari <adas...@guidewire.com> 写道:
>
> Hi Yanquan,
> I've finished reading the Sink FLIPs and am now reviewing some of the sink
> implementations, like TestSinkV2, to better understand the flow. I'll write
> a new one to experiment with.
> Are there flink sink docs/flow daigrams like detailed source
> implementation docs like
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/>
> ?
>
> Thanks.
>
> On Mon, Oct 14, 2024 at 10:18 AM Yanquan Lv <decq12y...@gmail.com> wrote:
>
>> Hi, Anil.
>> For your scenario, I think looking at FLIP-143 first and then FLIP-191
>> should provide a better understanding. Then you can look at other FLIPs or
>> specific implementations.
>>
>> Anil Dasari <adas...@guidewire.com> 于2024年10月15日周二 00:55写道:
>>
>>> Got it. thanks.
>>> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177
>>> and 191. So, Is there a sequence of steps flow charts for better
>>> understanding of the sink process with sink, writer and committer ?
>>>
>>> Thanks
>>>
>>> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com> wrote:
>>>
>>>> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will
>>>> be replaced by SupportsCommitter[1] interface, which was introduced in
>>>> Flink 1.19.
>>>> But you can still use TwoPhaseCommittingSink under Flink 2.0, it
>>>> depends on your target Flink version, The interfaces of these two APIs are
>>>> almost identical.
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
>>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html>
>>>>
>>>> Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道:
>>>>
>>>>> Hi Yanquan,
>>>>> Thanks for sharing the information.
>>>>> It appears that TwoPhaseCommittingSink is not available in the flink
>>>>> repo main branch. it is replaced with Sink, Committer and SinkWritter ?
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, Anil.
>>>>>>
>>>>>> Iceberg Sink is merged recently in
>>>>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>>>>>> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880>
>>>>>> .
>>>>>>
>>>>>> From your description, I guess that what you need is
>>>>>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with 
>>>>>> the
>>>>>> following steps:
>>>>>>
>>>>>> > 1. Group data by category and write it to S3 under its respective
>>>>>> prefix.
>>>>>> This can be done in
>>>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>>>>>  method.
>>>>>>
>>>>>> > 2. Update category metrics in a manifest file stored in the S3
>>>>>> manifest prefix.
>>>>>> > 3. Send category metrics to an external system to initiate
>>>>>> consumption.
>>>>>> These metrics information could be passed by
>>>>>>  
>>>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>>>>>> method.
>>>>>> And `Update category metrics`/`Send category metrics` can be done in
>>>>>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>>>>>
>>>>>> Rollback action could be done in SinkWriter or Committer, to delete
>>>>>> files from S3, you need to pass the files information though
>>>>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception 
>>>>>> to
>>>>>> let Flink job failover and retry.
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道:
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I am looking to implement a Flink sink for the following use case,
>>>>>> where the steps below are executed for each microbatch (using Spark
>>>>>> terminology) or trigger:
>>>>>>
>>>>>>    1. Group data by category and write it to S3 under its respective
>>>>>>    prefix.
>>>>>>    2. Update category metrics in a manifest file stored in the S3
>>>>>>    manifest prefix.
>>>>>>    3. Send category metrics to an external system to initiate
>>>>>>    consumption.
>>>>>>
>>>>>> In case of failure, any previously completed steps should be rolled
>>>>>> back, i.e., delete files from S3 and reprocess the entire microbatch.
>>>>>>
>>>>>> It seems this pattern can be implemented using the Unified Sink API,
>>>>>> as discussed in this video:
>>>>>> https://www.youtube.com/watch?v=0GVI25OEs4A
>>>>>> <https://www.youtube.com/watch?v=0GVI25OEs4A>
>>>>>> .
>>>>>>
>>>>>> I'm currently reviewing FileSink and IcebergSink (still searching
>>>>>> for the source code) to understand their implementations and create a new
>>>>>> one.
>>>>>>
>>>>>> Are there any step-by-step docs or examples available for writing a
>>>>>> new unified sink?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>

Reply via email to