Sorry, I couldn't find any clear and detailed user guidance other than FLIP in 
the official documentation too.


> 2024年10月15日 01:39,Anil Dasari <adas...@guidewire.com> 写道:
> 
> Hi Yanquan,
> I've finished reading the Sink FLIPs and am now reviewing some of the sink 
> implementations, like TestSinkV2, to better understand the flow. I'll write a 
> new one to experiment with.
> Are there flink sink docs/flow daigrams like detailed source implementation 
> docs like 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
>  ?
> 
> Thanks.
> 
> On Mon, Oct 14, 2024 at 10:18 AM Yanquan Lv <decq12y...@gmail.com 
> <mailto:decq12y...@gmail.com>> wrote:
>> Hi, Anil.
>> For your scenario, I think looking at FLIP-143 first and then FLIP-191 
>> should provide a better understanding. Then you can look at other FLIPs or 
>> specific implementations.
>> 
>> Anil Dasari <adas...@guidewire.com <mailto:adas...@guidewire.com>> 
>> 于2024年10月15日周二 00:55写道:
>>> Got it. thanks. 
>>> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177 
>>> and 191. So, Is there a sequence of steps flow charts for better 
>>> understanding of the sink process with sink, writer and committer ? 
>>> 
>>> Thanks
>>> 
>>> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com 
>>> <mailto:decq12y...@gmail.com>> wrote:
>>>> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will be 
>>>> replaced by SupportsCommitter[1] interface, which was introduced in Flink 
>>>> 1.19.
>>>> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends 
>>>> on your target Flink version, The interfaces of these two APIs are almost 
>>>> identical.
>>>> 
>>>> [1] 
>>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
>>>> 
>>>> Anil Dasari <adas...@guidewire.com <mailto:adas...@guidewire.com>> 
>>>> 于2024年10月14日周一 23:26写道:
>>>>> Hi Yanquan,
>>>>> Thanks for sharing the information.
>>>>> It appears that TwoPhaseCommittingSink is not available in the flink repo 
>>>>> main branch. it is replaced with Sink, Committer and SinkWritter ?
>>>>> 
>>>>> Thanks
>>>>> 
>>>>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com 
>>>>> <mailto:decq12y...@gmail.com>> wrote:
>>>>>> Hi, Anil.
>>>>>> 
>>>>>> Iceberg Sink is merged recently in 
>>>>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880.
>>>>>> 
>>>>>> From your description, I guess that what you need is a 
>>>>>> TwoPhaseCommittingSink[1], the steps you listed can be executed with the 
>>>>>> following steps:
>>>>>> 
>>>>>> > 1. Group data by category and write it to S3 under its respective 
>>>>>> > prefix.
>>>>>> This can be done in 
>>>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>>>>>  method.
>>>>>> 
>>>>>> > 2. Update category metrics in a manifest file stored in the S3 
>>>>>> > manifest prefix.
>>>>>> > 3. Send category metrics to an external system to initiate consumption.
>>>>>> These metrics information could be passed by  
>>>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>>>>>>  method.
>>>>>> And `Update category metrics`/`Send category metrics` can be done in 
>>>>>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>>>>> 
>>>>>> Rollback action could be done in SinkWriter or Committer, to delete 
>>>>>> files from S3, you need to pass the files information though 
>>>>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception 
>>>>>> to let Flink job failover and retry.
>>>>>> 
>>>>>> 
>>>>>> [1] 
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com 
>>>>>>> <mailto:adas...@guidewire.com>> 写道:
>>>>>>> 
>>>>>>> Hello,
>>>>>>> I am looking to implement a Flink sink for the following use case, 
>>>>>>> where the steps below are executed for each microbatch (using Spark 
>>>>>>> terminology) or trigger:
>>>>>>> 
>>>>>>> Group data by category and write it to S3 under its respective prefix.
>>>>>>> Update category metrics in a manifest file stored in the S3 manifest 
>>>>>>> prefix.
>>>>>>> Send category metrics to an external system to initiate consumption.
>>>>>>> In case of failure, any previously completed steps should be rolled 
>>>>>>> back, i.e., delete files from S3 and reprocess the entire microbatch.
>>>>>>> 
>>>>>>> It seems this pattern can be implemented using the Unified Sink API, as 
>>>>>>> discussed in this video: https://www.youtube.com/watch?v=0GVI25OEs4A.
>>>>>>> 
>>>>>>> I'm currently reviewing FileSink and IcebergSink (still searching for 
>>>>>>> the source code) to understand their implementations and create a new 
>>>>>>> one. 
>>>>>>> 
>>>>>>> Are there any step-by-step docs or examples available for writing a new 
>>>>>>> unified sink?
>>>>>>> 
>>>>>>> Thanks
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 

Reply via email to