Hi Tamir, Sorry I missed that you want to use Kafka. In that case I would suggest trying out the new KafkaSource [1] interface and it's built-in boundness support [2][3]. Maybe it will do the trick? If you want to be notified explicitly about the completion of such a bounded Kafka stream, you still can use this `Watermark#MAX_WATERMARK` trick mentioned above.
If not, can you let us know what is not working? Best, Piotrek [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness [3] https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer- śr., 14 lip 2021 o 11:59 Tamir Sagi <tamir.s...@niceactimize.com> napisał(a): > Hey Piotr, > > Thank you for your response. > > I saw the exact suggestion answer by David Anderson [1] but did not really > understand how it may help. > > Sources when finishing are emitting > {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}} > > Assuming 10 messages are sent to Kafka topic , processed and saved into DB > > 1. Kafka is not considered a finite source, after the 10th element it > will wait for more input, no? > 2. In such case, the 10th element will be marked with MAX_WATERMARK or > not? or at some point in the future? > > Now, Let's say the 10th element will be marked with MAX_WATERMARK, How > will I know when all elements have been saved into DB? > > Here is the execution Graph > Source(Kafka) --> Operator --- > Operator 2 --> Sink(PostgresSQL) > > Would you please elaborate about the time event function? where exactly > will it be integrated into the aforementioned execution graph ? > > Another question I have, based on our discussion. If the only thing that > changed is the source, apart from that the entire flow is the > same(operators and sink); is there any good practice to achieve a single > job for that? > > Tamir. > > [1] > https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302 > ------------------------------ > *From:* Piotr Nowojski <pnowoj...@apache.org> > *Sent:* Tuesday, July 13, 2021 4:54 PM > *To:* Tamir Sagi <tamir.s...@niceactimize.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Subject:* Re: Process finite stream and notify upon completion > > > *EXTERNAL EMAIL* > > > Hi, > > Sources when finishing are emitting > {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}, so I > think the best approach is to register an even time timer for > {{Watermark#MAX_WATERMARK}} or maybe {{Watermark#MAX_WATERMARK - 1}}. If > your function registers such a timer, it would be processed after > processing all of the records by that function (keep in mind Flink is a > distributed system so downstream operators/functions might still be busy > for some time processing last records, while upstream operators/functions > are already finished). > > Alternatively you can also implement a custom operator that implements > {{BoundedOneInput}} interface [1], it would work in the same way, but > implementing a custom operator is more difficult, only semi officially > supported and not well documented. > > Best, > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html > > pon., 12 lip 2021 o 12:44 Tamir Sagi <tamir.s...@niceactimize.com> > napisał(a): > > Hey Community, > > I'm working on a stream job that should aggregate a bounded data and > notify upon completion. (It works in Batch mode; however, I'm trying to > achieve the same results in Stream mode, if possible). > > Source: Kafka > Sink: PostgresDB > > *I'm looking for an elegant way to notify upon completion.* > > One solution I have in mind (Not perfect but might work) > > 1. Send message to topic for every record which successfully saved > into DB (From sink) > 2. Consume those messages externally to cluster > 3. If message is not consumed for fixed time, we assume the process > has finished. > > I was also wondering if TimeEventWindow with custom trigger and > AggregationFunction may help me here > However, I could not find a way to detect when all records have been > processed within the window. > > I'd go with Flink base solution if exists. > > Various References > flink-append-an-event-to-the-end-of-finite-datastream > <https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302> > how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic > <https://stackoverflow.com/questions/48427775/how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic> > > Best, > > Tamir. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. > > > Confidentiality: This communication and any attachments are intended for > the above-named persons only and may be confidential and/or legally > privileged. Any opinions expressed in this communication are not > necessarily those of NICE Actimize. If this communication has come to you > in error you must take no action based on it, nor must you copy or show it > to anyone; please delete/destroy and inform the sender by e-mail > immediately. > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. > Viruses: Although we have taken steps toward ensuring that this e-mail and > attachments are free from any virus, we advise that in keeping with good > computing practice the recipient should ensure they are actually virus free. >