Hi,
Observations on Watermarks:
Read this great article:
https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy

* Watermark means when for any event TS, when to stop waiting for arrival
of earlier events.
* Watermark t means all events with Timestamp < t have already arrived.
* When to push data out - When watermark with TS >= t arrives

Only *using incrementing current time for watermark seems to be working
correctly* but not sure if it aligns up correctly with EventTime processing.
*Using the incoming records intervalStart as the Watermark source  for
EventTime causes data to not be pushed at all* in cases when i have just 5
records in the Source.

My source generation for intervalStart has intervalStart incrementing at a
regular interval.
I tried using the intervalStart for my Watermark with a out of order late
boundedness of 3 secs.
The *AggregateFunction* I am using calls the add() fine but *never calls
the getResult().*
My assumption was that the AggregateFunction I am using would push the data
to getResult
based on the Watermark based on intervalStart incrementing beyong the
previous watermark t.
But it doesn't -is it because I have limited number of input records and
once intervalStart gets to the end
of the input records too fast, it stops incrementing the watermar and hence
doesn't push data ?

With System.currentTimeMillis, it happily keeps increasing and hence pushes
the data.

Created this class:
public class MonitoringAssigner implements
AssignerWithPunctuatedWatermarks<Monitoring> {
    private long bound = 3 * 1000;//3 secs out of order bound in millisecs

    public MonitoringAssigner(long bound) {
        this.bound = bound;
    }
    public Watermark checkAndGetNextWatermark(Monitoring monitoring, long
extractedTimestamp) {
        long nextWatermark = extractedTimestamp - bound;
        //simply emit a Watermark with every event
        return new Watermark(nextWatermark);
    }

    @Override
    public long extractTimestamp(Monitoring monitoring, long previousTS) {
        /*LocalDateTime intervalStart =
Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12
02:21:06.057
        long extractedTS =
Utils.getLongFromLocalDateTime(intervalStart);//*using
this stopped pushing recs after a certain time*
        return extractedTS;*/
        return *System.currentTimeMillis*();//incrementing current time

    }

Reply via email to