Hi Aian,

Which sink API are you using?
Have you tried the Sink v2 API [1]?

If you implement the WithPostCommitTopology interface [2], then you can
provide a follow-up step after the commits are finished. I have not tried
yet, but I expect that the failed Committables are emitted as well, and
available for further processing.
Be aware, that the Committables arrive to the PostCommitTopology in the
following checpoint-cycle than they are created. So if the data arrives in
checpoint 1 (C1), then we checpoint it in C1, commit in
C1.notifyCheckpointCompleted, and the PostcommitTopology will be able to
retry commit it in the following checkpoint (C2).
Also there is a known open issue at the end of streams. The last
checpoint's Committables are not processed by the PostCommitTopology at the
moment [3].

Alternatively you can implement retries in the Sink v2 Committer [4] as
well, and then, you don't have to rely on the PostCommitTopology.

I hope this helps,
Péter

- [1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
- [2]
https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.html
- [3] https://issues.apache.org/jira/browse/FLINK-30238
- [4] https://issues.apache.org/jira/browse/FLINK-30238

On Wed, Oct 18, 2023, 18:08 Aian Cantabrana <acantabr...@zylk.net> wrote:

> Hi,
>
> We have an use case where we need to ensure that data reaches all
> endpoints/sinks one by one or to split the flow if any of them fails. Here
> is an schema of the use case:
>
> Current job:  Source -> filters/maps/process  -> sink1
>
> \-> sink2
>
> \-> sink3
>
> Desired job: Source -> filters/maps/process -> sink1 -> if OK
> sink2         -> if OK sink3
>
> \-> if NOK sink4     \-> if NOK sink4
>
> In order to achieve it, we would like to have some kind of side output
> coming out from our sinks but side outputs are not available in
> SinkFunctions. The only way we have come up with is to implement the sink
> functionality inside a ProcessFunction or just call the sink.invoke()
> method from inside a process function but we hope there is a better/cleaner
> way to do it.
>
> We are working with the DataStream API in java.
>
> Thanks in advance,
>
> Aian
>
> --
> -----------------------------------------
> Aian Cantabrana
>
> ZYLK.net :: consultoría.openSource
> Ribera de Axpe, 11
> Edificio A, modulo 201-203
> 48950 Erandio (Bizkaia)
>
> telf.: +34 747421343
> ofic.: +34 944272119
> -----------------------------------------
>

Reply via email to