Hi Yanquan,
I've finished reading the Sink FLIPs and am now reviewing some of the sink
implementations, like TestSinkV2, to better understand the flow. I'll write
a new one to experiment with.
Are there flink sink docs/flow daigrams like detailed source implementation
docs like
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
?

Thanks.

On Mon, Oct 14, 2024 at 10:18 AM Yanquan Lv <decq12y...@gmail.com> wrote:

> Hi, Anil.
> For your scenario, I think looking at FLIP-143 first and then FLIP-191
> should provide a better understanding. Then you can look at other FLIPs or
> specific implementations.
>
> Anil Dasari <adas...@guidewire.com> 于2024年10月15日周二 00:55写道:
>
>> Got it. thanks.
>> Sink improvements have many FLIP confluence pages i.e FLIP-143, 171, 177
>> and 191. So, Is there a sequence of steps flow charts for better
>> understanding of the sink process with sink, writer and committer ?
>>
>> Thanks
>>
>> On Mon, Oct 14, 2024 at 9:48 AM Yanquan Lv <decq12y...@gmail.com> wrote:
>>
>>> Yeah, TwoPhaseCommittingSink will be removed in Flink 2.0, and it will
>>> be replaced by SupportsCommitter[1] interface, which was introduced in
>>> Flink 1.19.
>>> But you can still use TwoPhaseCommittingSink under Flink 2.0, it depends
>>> on your target Flink version, The interfaces of these two APIs are almost
>>> identical.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html
>>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html>
>>>
>>> Anil Dasari <adas...@guidewire.com> 于2024年10月14日周一 23:26写道:
>>>
>>>> Hi Yanquan,
>>>> Thanks for sharing the information.
>>>> It appears that TwoPhaseCommittingSink is not available in the flink
>>>> repo main branch. it is replaced with Sink, Committer and SinkWritter ?
>>>>
>>>> Thanks
>>>>
>>>> On Mon, Oct 14, 2024 at 1:45 AM Yanquan Lv <decq12y...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, Anil.
>>>>>
>>>>> Iceberg Sink is merged recently in
>>>>> https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880
>>>>> <https://github.com/apache/iceberg/pull/10179#pullrequestreview-2350414880>
>>>>> .
>>>>>
>>>>> From your description, I guess that what you need is
>>>>> a TwoPhaseCommittingSink[1], the steps you listed can be executed with the
>>>>> following steps:
>>>>>
>>>>> > 1. Group data by category and write it to S3 under its respective
>>>>> prefix.
>>>>> This can be done in
>>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#write
>>>>>  method.
>>>>>
>>>>> > 2. Update category metrics in a manifest file stored in the S3
>>>>> manifest prefix.
>>>>> > 3. Send category metrics to an external system to initiate
>>>>> consumption.
>>>>> These metrics information could be passed by
>>>>>  
>>>>> org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink$PrecommittingSinkWriter#prepareCommit
>>>>> method.
>>>>> And `Update category metrics`/`Send category metrics` can be done in
>>>>> org.apache.flink.api.connector.sink2.Committer#commit method.
>>>>>
>>>>> Rollback action could be done in SinkWriter or Committer, to delete
>>>>> files from S3, you need to pass the files information though
>>>>> PrecommittingSinkWriter#prepareCommit too. Then you can throw exception to
>>>>> let Flink job failover and retry.
>>>>>
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html
>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.20/api/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.html>
>>>>>
>>>>>
>>>>>
>>>>> 2024年10月14日 12:49,Anil Dasari <adas...@guidewire.com> 写道:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I am looking to implement a Flink sink for the following use case,
>>>>> where the steps below are executed for each microbatch (using Spark
>>>>> terminology) or trigger:
>>>>>
>>>>>    1. Group data by category and write it to S3 under its respective
>>>>>    prefix.
>>>>>    2. Update category metrics in a manifest file stored in the S3
>>>>>    manifest prefix.
>>>>>    3. Send category metrics to an external system to initiate
>>>>>    consumption.
>>>>>
>>>>> In case of failure, any previously completed steps should be rolled
>>>>> back, i.e., delete files from S3 and reprocess the entire microbatch.
>>>>>
>>>>> It seems this pattern can be implemented using the Unified Sink API,
>>>>> as discussed in this video:
>>>>> https://www.youtube.com/watch?v=0GVI25OEs4A
>>>>> <https://www.youtube.com/watch?v=0GVI25OEs4A>
>>>>> .
>>>>>
>>>>> I'm currently reviewing FileSink and IcebergSink (still searching for
>>>>> the source code) to understand their implementations and create a new one.
>>>>>
>>>>> Are there any step-by-step docs or examples available for writing a
>>>>> new unified sink?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to