Hi Tamir,
a nice property of watermarks is that they are kind of synchronized
across input operators and their partitions (i.e. parallel instances).
Bounded sources will emit a final MAX_WATERMARK once they have processed
all data. When you receive a MAX_WATERMARK in your current operator, you
can be sure that all data has been processed upstream. And all records
have arrived at your operator's parallel instance.
Regards,
Timo
On 14.07.21 15:05, Tamir Sagi wrote:
Hey Piotr,
Thank you for fast response,
The refs are good, however , to be honest, I'm a little confused
regarding the trick with MAX_WATERMARK . Maybe I'm missing something.
keep in mind Flink is a distributed system so
downstream operators/functions might still be busy for some time
processing last records, while upstream operators/functions are
already finished).
I'm trying to understand based on your suggestion and some Ref[1] how
MAX_WATERMARK could be useful in such scenario if it might be processed
before #{MAX_WATERMARK - 1} .
Following [2], MAX_WATERMARK = The watermark that signifies
end-of-event-time.
Thank you,
Tamir.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time>
Timely Stream Processing | Apache Flink
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time>
Timely Stream Processing # Introduction # Timely stream processing is an
extension of stateful stream processing in which time plays some role in
the computation. Among other things, this is the case when you do time
series analysis, when doing aggregations based on certain time periods
(typically called windows), or when you do event processing where the
time when an event occurred is important.
ci.apache.org
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/watermark/Watermark.html#MAX_WATERMARK
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/watermark/Watermark.html#MAX_WATERMARK>
------------------------------------------------------------------------
*From:* Piotr Nowojski <pnowoj...@apache.org>
*Sent:* Wednesday, July 14, 2021 1:36 PM
*To:* Tamir Sagi <tamir.s...@niceactimize.com>
*Cc:* user@flink.apache.org <user@flink.apache.org>
*Subject:* Re: Process finite stream and notify upon completion
*EXTERNAL EMAIL*
Hi Tamir,
Sorry I missed that you want to use Kafka. In that case I would suggest
trying out the new KafkaSource [1] interface and it's built-in boundness
support [2][3]. Maybe it will do the trick? If you want to be notified
explicitly about the completion of such a bounded Kafka stream, you
still can use this `Watermark#MAX_WATERMARK` trick mentioned above.
If not, can you let us know what is not working?
Best,
Piotrek
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source>
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness>
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer-
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer->
śr., 14 lip 2021 o 11:59 Tamir Sagi <tamir.s...@niceactimize.com
<mailto:tamir.s...@niceactimize.com>> napisał(a):
Hey Piotr,
Thank you for your response.
I saw the exact suggestion answer by David Anderson [1] but did not
really understand how it may help.
Sources when finishing are emitting
{{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}
Assuming 10 messages are sent to Kafka topic , processed and saved
into DB
1. Kafka is not considered a finite source, after the 10th element
it will wait for more input, no?
2. In such case, the 10th element will be marked with MAX_WATERMARK
or not? or at some point in the future?
Now, Let's say the 10th element will be marked with MAX_WATERMARK,
How will I know when all elements have been saved into DB?
Here is the execution Graph
Source(Kafka) --> Operator --- > Operator 2 --> Sink(PostgresSQL)
Would you please elaborate about the time event function? where
exactly will it be integrated into the aforementioned execution graph ?
Another question I have, based on our discussion. If the only thing
that changed is the source, apart from that the entire flow is the
same(operators and sink); is there any good practice to achieve a
single job for that?
Tamir.
[1]
https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
<https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302>
------------------------------------------------------------------------
*From:* Piotr Nowojski <pnowoj...@apache.org
<mailto:pnowoj...@apache.org>>
*Sent:* Tuesday, July 13, 2021 4:54 PM
*To:* Tamir Sagi <tamir.s...@niceactimize.com
<mailto:tamir.s...@niceactimize.com>>
*Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
<user@flink.apache.org <mailto:user@flink.apache.org>>
*Subject:* Re: Process finite stream and notify upon completion
*EXTERNAL EMAIL*
Hi,
Sources when finishing are emitting
{{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}, so
I think the best approach is to register an even time timer for
{{Watermark#MAX_WATERMARK}} or maybe {{Watermark#MAX_WATERMARK -
1}}. If your function registers such a timer, it would be processed
after processing all of the records by that function (keep in mind
Flink is a distributed system so downstream operators/functions
might still be busy for some time processing last records, while
upstream operators/functions are already finished).
Alternatively you can also implement a custom operator that
implements {{BoundedOneInput}} interface [1], it would work in the
same way, but implementing a custom operator is more difficult, only
semi officially supported and not well documented.
Best,
Piotrek
[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html>
pon., 12 lip 2021 o 12:44 Tamir Sagi <tamir.s...@niceactimize.com
<mailto:tamir.s...@niceactimize.com>> napisał(a):
Hey Community,
I'm working on a stream job that should aggregate a bounded data
and notify upon completion. (It works in Batch mode; however,
I'm trying to achieve the same results in Stream mode, if possible).
Source: Kafka
Sink: PostgresDB
*I'm looking for an elegant way to notify upon completion.*
One solution I have in mind (Not perfect but might work)
1. Send message to topic for every record which successfully
saved into DB (From sink)
2. Consume those messages externally to cluster
3. If message is not consumed for fixed time, we assume the
process has finished.
I was also wondering if TimeEventWindow with custom trigger and
AggregationFunction may help me here
However, I could not find a way to detect when all records
have been processed within the window.
I'd go with Flink base solution if exists.
Various References
flink-append-an-event-to-the-end-of-finite-datastream
<https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302>
how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic
<https://stackoverflow.com/questions/48427775/how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic>
Best,
Tamir.
Confidentiality: This communication and any attachments are
intended for the above-named persons only and may be
confidential and/or legally privileged. Any opinions expressed
in this communication are not necessarily those of NICE
Actimize. If this communication has come to you in error you
must take no action based on it, nor must you copy or show it to
anyone; please delete/destroy and inform the sender by e-mail
immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this
e-mail and attachments are free from any virus, we advise that
in keeping with good computing practice the recipient should
ensure they are actually virus free.
Confidentiality: This communication and any attachments are intended
for the above-named persons only and may be confidential and/or
legally privileged. Any opinions expressed in this communication are
not necessarily those of NICE Actimize. If this communication has
come to you in error you must take no action based on it, nor must
you copy or show it to anyone; please delete/destroy and inform the
sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this
e-mail and attachments are free from any virus, we advise that in
keeping with good computing practice the recipient should ensure
they are actually virus free.
Confidentiality: This communication and any attachments are intended for
the above-named persons only and may be confidential and/or legally
privileged. Any opinions expressed in this communication are not
necessarily those of NICE Actimize. If this communication has come to
you in error you must take no action based on it, nor must you copy or
show it to anyone; please delete/destroy and inform the sender by e-mail
immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail
and attachments are free from any virus, we advise that in keeping with
good computing practice the recipient should ensure they are actually
virus free.