Hey Piotr,

Thank you for fast response,

The refs are good, however , to be honest, I'm a little confused regarding the 
trick with MAX_WATERMARK . Maybe I'm missing something.
keep in mind Flink is a distributed system so downstream operators/functions 
might still be busy for some time processing last records, while upstream 
operators/functions are already finished).
I'm trying to understand based on your suggestion and some Ref[1] how 
MAX_WATERMARK could be useful in such scenario if it might be processed before 
#{MAX_WATERMARK - 1} .

Following [2], MAX_WATERMARK = The watermark that signifies end-of-event-time.

Thank you,

Tamir.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time

Timely Stream Processing | Apache 
Flink<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time>
Timely Stream Processing # Introduction # Timely stream processing is an 
extension of stateful stream processing in which time plays some role in the 
computation. Among other things, this is the case when you do time series 
analysis, when doing aggregations based on certain time periods (typically 
called windows), or when you do event processing where the time when an event 
occurred is important.
ci.apache.org

[2]https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/watermark/Watermark.html#MAX_WATERMARK


[https://my-email-signature.link/signature.gif?u=1088647&e=165223378&v=33c4e07986a1f3d067878111fafa9080ed3117be34b03ee1f500050314c40870]
________________________________
From: Piotr Nowojski <pnowoj...@apache.org>
Sent: Wednesday, July 14, 2021 1:36 PM
To: Tamir Sagi <tamir.s...@niceactimize.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Process finite stream and notify upon completion


EXTERNAL EMAIL


Hi Tamir,

Sorry I missed that you want to use Kafka. In that case I would suggest trying 
out the new KafkaSource [1] interface and it's built-in boundness support 
[2][3]. Maybe it will do the trick? If you want to be notified explicitly about 
the completion of such a bounded Kafka stream, you still can use this 
`Watermark#MAX_WATERMARK` trick mentioned above.

If not, can you let us know what is not working?

Best,
Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer-


śr., 14 lip 2021 o 11:59 Tamir Sagi 
<tamir.s...@niceactimize.com<mailto:tamir.s...@niceactimize.com>> napisał(a):
Hey Piotr,

Thank you for your response.

I saw the exact suggestion answer by David Anderson [1] but did not really 
understand how it may help.
Sources when finishing are emitting 
{{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}
Assuming 10 messages are sent to Kafka topic , processed and saved into DB

  1.  Kafka is not considered a finite source, after the 10th element it will 
wait for more input, no?
  2.  In such case, the 10th element will be marked with MAX_WATERMARK or not? 
or at some point in the future?

Now, Let's say the 10th element will be marked with MAX_WATERMARK, How will I 
know when all elements have been saved into DB?

Here is the execution Graph
Source(Kafka) --> Operator --- > Operator 2 --> Sink(PostgresSQL)

Would you please elaborate about the time event function? where exactly will it 
be integrated into the aforementioned execution graph ?

Another question I have, based on our discussion. If the only thing that 
changed is the source, apart from that the entire flow is the same(operators 
and sink);  is there any good practice to achieve a single job for that?

Tamir.

[1] 
https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
[https://my-email-signature.link/signature.gif?u=1088647&e=165173585&v=c25476e2bc0545218fc0199a8f9ad82d6f6ea0284e97e87c95a4eba2cd544800]
________________________________
From: Piotr Nowojski <pnowoj...@apache.org<mailto:pnowoj...@apache.org>>
Sent: Tuesday, July 13, 2021 4:54 PM
To: Tamir Sagi <tamir.s...@niceactimize.com<mailto:tamir.s...@niceactimize.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Process finite stream and notify upon completion


EXTERNAL EMAIL


Hi,

Sources when finishing are emitting 
{{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}, so I 
think the best approach is to register an even time timer for 
{{Watermark#MAX_WATERMARK}} or maybe {{Watermark#MAX_WATERMARK - 1}}. If your 
function registers such a timer, it would be processed after processing all of 
the records by that function (keep in mind Flink is a distributed system so 
downstream operators/functions might still be busy for some time processing 
last records, while upstream operators/functions are already finished).

Alternatively you can also implement a custom operator that implements 
{{BoundedOneInput}} interface [1], it would work in the same way, but 
implementing a custom operator is more difficult, only semi officially 
supported and not well documented.

Best,
Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html

pon., 12 lip 2021 o 12:44 Tamir Sagi 
<tamir.s...@niceactimize.com<mailto:tamir.s...@niceactimize.com>> napisał(a):
Hey Community,

I'm working on a stream job that should aggregate a bounded data and notify 
upon completion. (It works in Batch mode; however, I'm trying to achieve the 
same results in Stream mode, if possible).

Source: Kafka
Sink: PostgresDB

I'm looking for an elegant way to notify upon completion.

One solution I have in mind (Not perfect but might work)

  1.  Send message to topic for every record which successfully saved into DB 
(From sink)
  2.  Consume those messages externally to cluster
  3.  If message is not consumed for fixed time, we assume the process has 
finished.

I was also wondering if TimeEventWindow with custom trigger and 
AggregationFunction may help me here
However, I could not find a way to detect when all records have been processed 
within the window.

I'd go with Flink base solution if exists.

Various References
flink-append-an-event-to-the-end-of-finite-datastream<https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302>
how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic<https://stackoverflow.com/questions/48427775/how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic>

Best,

Tamir.

[https://my-email-signature.link/signature.gif?u=1088647&e=164617504&v=72d82b6c49941ee28c36705774c35eded8a4e23b012a0b302113dc8e26ae5a7b]

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Reply via email to