Hi Yanquan,
Could you provide some insights and help with this? I see similar requests
in flink slack channel and no active conversations. It seems that the Flink
Slack channels aren't very active, and the documentation lacks examples for
different approaches.

I'm trying to implement micro-batching with near real-time processing and
have explored the following options:

   1. *Windowing*: This seemed promising, but the flushing mechanism
   requires record-level information checks as window information isn't
   available throughout the pipeline.
   2. *Window + Trigger*: This buffers events until the trigger interval is
   reached, which makes it non-real-time; events are only processed when the
   trigger event is triggered.
   3. *Processing Time*: The processing time is local to each file writer,
   meaning it's not consistent across different task managers.
   4. *Watermark*: There’s no global watermark; it's specific to each
   source task. and initial watermark (pre first watermark event) info is not
   epoch.

want to write the data grouped by time (micro-batch time) and then by
category. What’s the best approach to achieve micro-batching in Flink?

Thanks

On Fri, Oct 25, 2024 at 2:35 AM Anil Dasari <adas...@guidewire.com> wrote:

> Hello all,
>
> Are there Flink patterns that support microbatching and ensure all data
> for a microbatch is written to a specific prefix in the destination with
> exactly-once delivery?
> I’ve explored both window and processing time (implemented in FileSink)
> options to set the boundaries for microbatches.
>
> However, processing time can't establish data boundaries by timestamp
> across multiple sink writer tasks when each writer starts at different
> times due to Flink's auto-scaling. If I understand correctly, the process
> time approach operates based on fixed intervals within a single sink writer
> (e.g., FileSink) rather than as a fixed interval task across multiple sink
> writers.
>
> The window option requires checking both the current and previous
> element's window start times to determine the boundaries, complicating the
> data flush process.
> Is there a way to listen for the window start/end event in all the
> pipeline operators post window operator to effectively implement this as an
> OnProcessTime function in FileSink?
>
> If yes, Could you please point me to the docs or examples if any? thanks
> in advance.
>
> Thanks
>
> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com> wrote:
>
>> Hi, Anil.
>>
>> Iceberg Sink is merged recently in
>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880>
>> .
>>
>> From your description, I guess that what you need is
>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
>> following steps:
>>
>> > 1. Group data by category and write it to S3 under its respective
>> prefix.
>> This can be done in
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>  method.
>>
>> > 2. Update category metrics in a manifest file stored in the S3 manifest
>> prefix.
>> > 3. Send category metrics to an external system to initiate consumption.
>> These metrics information could be passed by
>>  
>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>> method.
>> And `Update category metrics`/`Send category metrics` can be done in
>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>
>> Rollback action could be done in SinkWriter or Committer, to delete files
>> from S3, you need to pass the files information though
>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
>> let Flink job failover and retry.
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html>
>>
>>
>>
>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道:
>>
>> Hello,
>>
>> I am looking to implement a Flink sink for the following use case, where
>> the steps below are executed for each microbatch (using Spark terminology)
>> or trigger:
>>
>>    1. Group data by category and write it to S3 under its respective
>>    prefix.
>>    2. Update category metrics in a manifest file stored in the S3
>>    manifest prefix.
>>    3. Send category metrics to an external system to initiate
>>    consumption.
>>
>> In case of failure, any previously completed steps should be rolled back,
>> i.e., delete files from S3 and reprocess the entire microbatch.
>>
>> It seems this pattern can be implemented using the Unified Sink API, as
>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A
>> <https://www.youtube.com/watch?v=0GVI25OEs4A>
>> .
>>
>> I'm currently reviewing FileSink and IcebergSink (still searching for
>> the source code) to understand their implementations and create a new one.
>>
>> Are there any step-by-step docs or examples available for writing a new
>> unified sink?
>>
>> Thanks
>>
>>
>>
>>

Reply via email to