Hi Aian, Which sink API are you using? Have you tried the Sink v2 API [1]?
If you implement the WithPostCommitTopology interface [2], then you can provide a follow-up step after the commits are finished. I have not tried yet, but I expect that the failed Committables are emitted as well, and available for further processing. Be aware, that the Committables arrive to the PostCommitTopology in the following checpoint-cycle than they are created. So if the data arrives in checpoint 1 (C1), then we checpoint it in C1, commit in C1.notifyCheckpointCompleted, and the PostcommitTopology will be able to retry commit it in the following checkpoint (C2). Also there is a known open issue at the end of streams. The last checpoint's Committables are not processed by the PostCommitTopology at the moment [3]. Alternatively you can implement retries in the Sink v2 Committer [4] as well, and then, you don't have to rely on the PostCommitTopology. I hope this helps, Péter - [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction - [2] https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/streaming/api/connector/sink2/WithPostCommitTopology.html - [3] https://issues.apache.org/jira/browse/FLINK-30238 - [4] https://issues.apache.org/jira/browse/FLINK-30238 On Wed, Oct 18, 2023, 18:08 Aian Cantabrana <acantabr...@zylk.net> wrote: > Hi, > > We have an use case where we need to ensure that data reaches all > endpoints/sinks one by one or to split the flow if any of them fails. Here > is an schema of the use case: > > Current job: Source -> filters/maps/process -> sink1 > > \-> sink2 > > \-> sink3 > > Desired job: Source -> filters/maps/process -> sink1 -> if OK > sink2 -> if OK sink3 > > \-> if NOK sink4 \-> if NOK sink4 > > In order to achieve it, we would like to have some kind of side output > coming out from our sinks but side outputs are not available in > SinkFunctions. The only way we have come up with is to implement the sink > functionality inside a ProcessFunction or just call the sink.invoke() > method from inside a process function but we hope there is a better/cleaner > way to do it. > > We are working with the DataStream API in java. > > Thanks in advance, > > Aian > > -- > ----------------------------------------- > Aian Cantabrana > > ZYLK.net :: consultoría.openSource > Ribera de Axpe, 11 > Edificio A, modulo 201-203 > 48950 Erandio (Bizkaia) > > telf.: +34 747421343 > ofic.: +34 944272119 > ----------------------------------------- >