Got it. thanks.
Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177
and 191. So, Is there a sequence of steps flow charts for better
understanding of the sink process with sink, writer and committer ?

Thanks

On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com> wrote:

> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be
> replaced by SupportsCommitter[1] interface, which was introduced in Flink
> 1.19.
> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends
> on your target Flink version, The interfaces of these two APIs are almost
> identical.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html>
>
> Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道:
>
>> Hi Yanquan,
>> Thanks for sharing the information.
>> It appears that TwoPhaseCommittingSink is not available in the flink repo
>> main branch. it is replaced with Sink, Committer and SinkWritter ?
>>
>> Thanks
>>
>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote:
>>
>>> Hi, Anil.
>>>
>>> Iceberg Sink is merged recently in
>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>>> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880>
>>> .
>>>
>>> From your description, I guess that what you need is
>>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
>>> following steps:
>>>
>>> > 1. Group data by category and write it to S3 under its respective
>>> prefix.
>>> This can be done in
>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>>  method.
>>>
>>> > 2. Update category metrics in a manifest file stored in the S3
>>> manifest prefix.
>>> > 3. Send category metrics to an external system to initiate consumption.
>>> These metrics information could be passed by
>>>  
>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>>> method.
>>> And `Update category metrics`/`Send category metrics` can be done in
>>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>>
>>> Rollback action could be done in SinkWriter or Committer, to delete
>>> files from S3, you need to pass the files information though
>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
>>> let Flink job failover and retry.
>>>
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html>
>>>
>>>
>>>
>>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道:
>>>
>>> Hello,
>>>
>>> I am looking to implement a Flink sink for the following use case, where
>>> the steps below are executed for each microbatch (using Spark terminology)
>>> or trigger:
>>>
>>>    1. Group data by category and write it to S3 under its respective
>>>    prefix.
>>>    2. Update category metrics in a manifest file stored in the S3
>>>    manifest prefix.
>>>    3. Send category metrics to an external system to initiate
>>>    consumption.
>>>
>>> In case of failure, any previously completed steps should be rolled
>>> back, i.e., delete files from S3 and reprocess the entire microbatch.
>>>
>>> It seems this pattern can be implemented using the Unified Sink API, as
>>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A
>>> <https://www.youtube.com/watch?v=0GVI25OEs4A>
>>> .
>>>
>>> I'm currently reviewing FileSink and IcebergSink (still searching for
>>> the source code) to understand their implementations and create a new one.
>>>
>>> Are there any step-by-step docs or examples available for writing a new
>>> unified sink?
>>>
>>> Thanks
>>>
>>>
>>>
>>>

Reply via email to