Hi, Sources when finishing are emitting {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}, so I think the best approach is to register an even time timer for {{Watermark#MAX_WATERMARK}} or maybe {{Watermark#MAX_WATERMARK - 1}}. If your function registers such a timer, it would be processed after processing all of the records by that function (keep in mind Flink is a distributed system so downstream operators/functions might still be busy for some time processing last records, while upstream operators/functions are already finished).
Alternatively you can also implement a custom operator that implements {{BoundedOneInput}} interface [1], it would work in the same way, but implementing a custom operator is more difficult, only semi officially supported and not well documented. Best, Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html pon., 12 lip 2021 o 12:44 Tamir Sagi <tamir.s...@niceactimize.com> napisaĆ(a): > Hey Community, > > I'm working on a stream job that should aggregate a bounded data and > notify upon completion. (It works in Batch mode; however, I'm trying to > achieve the same results in Stream mode, if possible). > > Source: Kafka > Sink: PostgresDB > > *I'm looking for an elegant way to notify upon completion.* > > One solution I have in mind (Not perfect but might work) > > 1. Send message to topic for every record which successfully saved > into DB (From sink) > 2. Consume those messages externally to cluster > 3. If message is not consumed for fixed time, we assume the process > has finished. > > I was also wondering if TimeEventWindow with custom trigger and > AggregationFunction may help me here > However, I could not find a way to detect when all records have been > processed within the window. > > I'd go with Flink base solution if exists. > > Various References > flink-append-an-event-to-the-end-of-finite-datastream > <https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302> > how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic > <https://stackoverflow.com/questions/48427775/how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic> > > Best, > > Tamir. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. >