Hi Yanquan,
Thanks for sharing the information.
It appears that TwoPhaseCommittingSink is not available in the flink repo
main branch. it is replaced with Sink, Committer and SinkWritter ?

Thanks

On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote:

> Hi, Anil.
>
> Iceberg Sink is merged recently in
> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880>
> .
>
> From your description, I guess that what you need is
> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
> following steps:
>
> > 1. Group data by category and write it to S3 under its respective prefix.
> This can be done in
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>  method.
>
> > 2. Update category metrics in a manifest file stored in the S3 manifest
> prefix.
> > 3. Send category metrics to an external system to initiate consumption.
> These metrics information could be passed by
>  
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
> method.
> And `Update category metrics`/`Send category metrics` can be done in
> org.apache.flink.api.connector.sink2.Committer#commit method.
>
> Rollback action could be done in SinkWriter or Committer, to delete files
> from S3, you need to pass the files information though
> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
> let Flink job failover and retry.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html>
>
>
>
> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道:
>
> Hello,
>
> I am looking to implement a Flink sink for the following use case, where
> the steps below are executed for each microbatch (using Spark terminology)
> or trigger:
>
>    1. Group data by category and write it to S3 under its respective
>    prefix.
>    2. Update category metrics in a manifest file stored in the S3
>    manifest prefix.
>    3. Send category metrics to an external system to initiate consumption.
>
> In case of failure, any previously completed steps should be rolled back,
> i.e., delete files from S3 and reprocess the entire microbatch.
>
> It seems this pattern can be implemented using the Unified Sink API, as
> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A
> <https://www.youtube.com/watch?v=0GVI25OEs4A>
> .
>
> I'm currently reviewing FileSink and IcebergSink (still searching for the
> source code) to understand their implementations and create a new one.
>
> Are there any step-by-step docs or examples available for writing a new
> unified sink?
>
> Thanks
>
>
>
>

Reply via email to