Hi, Anil.
For your scenario, I think looking at FLIP-143 first and then FLIP-191
should provide a better understanding. Then you can look at other FLIPs or
specific implementations.

Anil Dasari <adas...@guidewire.com> 于2024年10月15日周二 00:55写道:

> Got it. thanks.
> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177
> and 191. So, Is there a sequence of steps flow charts for better
> understanding of the sink process with sink, writer and committer ?
>
> Thanks
>
> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com> wrote:
>
>> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be
>> replaced by SupportsCommitter[1] interface, which was introduced in Flink
>> 1.19.
>> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends
>> on your target Flink version, The interfaces of these two APIs are almost
>> identical.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
>>
>> Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道:
>>
>>> Hi Yanquan,
>>> Thanks for sharing the information.
>>> It appears that TwoPhaseCommittingSink is not available in the flink
>>> repo main branch. it is replaced with Sink, Committer and SinkWritter ?
>>>
>>> Thanks
>>>
>>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote:
>>>
>>>> Hi, Anil.
>>>>
>>>> Iceberg Sink is merged recently in
>>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>>>> .
>>>>
>>>> From your description, I guess that what you need is
>>>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
>>>> following steps:
>>>>
>>>> > 1. Group data by category and write it to S3 under its respective
>>>> prefix.
>>>> This can be done in
>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>>>  method.
>>>>
>>>> > 2. Update category metrics in a manifest file stored in the S3
>>>> manifest prefix.
>>>> > 3. Send category metrics to an external system to initiate
>>>> consumption.
>>>> These metrics information could be passed by
>>>>  
>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>>>> method.
>>>> And `Update category metrics`/`Send category metrics` can be done in
>>>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>>>
>>>> Rollback action could be done in SinkWriter or Committer, to delete
>>>> files from S3, you need to pass the files information though
>>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
>>>> let Flink job failover and retry.
>>>>
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>>>>
>>>>
>>>>
>>>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道:
>>>>
>>>> Hello,
>>>>
>>>> I am looking to implement a Flink sink for the following use case,
>>>> where the steps below are executed for each microbatch (using Spark
>>>> terminology) or trigger:
>>>>
>>>>    1. Group data by category and write it to S3 under its respective
>>>>    prefix.
>>>>    2. Update category metrics in a manifest file stored in the S3
>>>>    manifest prefix.
>>>>    3. Send category metrics to an external system to initiate
>>>>    consumption.
>>>>
>>>> In case of failure, any previously completed steps should be rolled
>>>> back, i.e., delete files from S3 and reprocess the entire microbatch.
>>>>
>>>> It seems this pattern can be implemented using the Unified Sink API, as
>>>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A.
>>>>
>>>> I'm currently reviewing FileSink and IcebergSink (still searching for
>>>> the source code) to understand their implementations and create a new one.
>>>>
>>>> Are there any step-by-step docs or examples available for writing a new
>>>> unified sink?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>

Reply via email to