Hello all,

Are there Flink patterns that support microbatching and ensure all data for
a microbatch is written to a specific prefix in the destination with
exactly-once delivery?
I’ve explored both window and processing time (implemented in FileSink)
options to set the boundaries for microbatches.

However, processing time can't establish data boundaries by timestamp
across multiple sink writer tasks when each writer starts at different
times due to Flink's auto-scaling. If I understand correctly, the process
time approach operates based on fixed intervals within a single sink writer
(e.g., FileSink) rather than as a fixed interval task across multiple sink
writers.

The window option requires checking both the current and previous element's
window start times to determine the boundaries, complicating the data flush
process.
Is there a way to listen for the window start/end event in all the pipeline
operators post window operator to effectively implement this as an
OnProcessTime function in FileSink?

If yes, Could you please point me to the docs or examples if any? thanks in
advance.

Thanks

On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote:

> Hi, Anil.
>
> Iceberg Sink is merged recently in
> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880>
> .
>
> From your description, I guess that what you need is
> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
> following steps:
>
> > 1. Group data by category and write it to S3 under its respective prefix.
> This can be done in
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>  method.
>
> > 2. Update category metrics in a manifest file stored in the S3 manifest
> prefix.
> > 3. Send category metrics to an external system to initiate consumption.
> These metrics information could be passed by
>  
> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
> method.
> And `Update category metrics`/`Send category metrics` can be done in
> org.apache.flink.api.connector.sink2.Committer#commit method.
>
> Rollback action could be done in SinkWriter or Committer, to delete files
> from S3, you need to pass the files information though
> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
> let Flink job failover and retry.
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html>
>
>
>
> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道:
>
> Hello,
>
> I am looking to implement a Flink sink for the following use case, where
> the steps below are executed for each microbatch (using Spark terminology)
> or trigger:
>
>    1. Group data by category and write it to S3 under its respective
>    prefix.
>    2. Update category metrics in a manifest file stored in the S3
>    manifest prefix.
>    3. Send category metrics to an external system to initiate consumption.
>
> In case of failure, any previously completed steps should be rolled back,
> i.e., delete files from S3 and reprocess the entire microbatch.
>
> It seems this pattern can be implemented using the Unified Sink API, as
> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A
> <https://www.youtube.com/watch?v=0GVI25OEs4A>
> .
>
> I'm currently reviewing FileSink and IcebergSink (still searching for the
> source code) to understand their implementations and create a new one.
>
> Are there any step-by-step docs or examples available for writing a new
> unified sink?
>
> Thanks
>
>
>
>

Reply via email to