Hi, Anil.

Iceberg Sink is merged recently in 
https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880.

From your description, I guess that what you need is a 
TwoPhaseCommittingSink[1], the steps you listed can be executed with the 
following steps:

> 1. Group data by category and write it to S3 under its respective prefix.
This can be done in 
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
 method.

> 2. Update category metrics in a manifest file stored in the S3 manifest 
> prefix.
> 3. Send category metrics to an external system to initiate consumption.
These metrics information could be passed by  
org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
 method.
And `Update category metrics`/`Send category metrics` can be done in 
org.apache.flink.api.connector.sink2.Committer#commit method.

Rollback action could be done in SinkWriter or Committer, to delete files from 
S3, you need to pass the files information though 
PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to let 
Flink job failover and retry.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html



> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道:
> 
> Hello,
> I am looking to implement a Flink sink for the following use case, where the 
> steps below are executed for each microbatch (using Spark terminology) or 
> trigger:
> 
> Group data by category and write it to S3 under its respective prefix.
> Update category metrics in a manifest file stored in the S3 manifest prefix.
> Send category metrics to an external system to initiate consumption.
> In case of failure, any previously completed steps should be rolled back, 
> i.e., delete files from S3 and reprocess the entire microbatch.
> 
> It seems this pattern can be implemented using the Unified Sink API, as 
> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A.
> 
> I'm currently reviewing FileSink and IcebergSink (still searching for the 
> source code) to understand their implementations and create a new one. 
> 
> Are there any step-by-step docs or examples available for writing a new 
> unified sink?
> 
> Thanks
> 
> 
> 

Reply via email to