Using Flink EventTime feature, I implement the class
AssignerWithPeriodicWatermark such that:

public static class SampleTimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple3<String, Long, JSONObject>> {
    private static final long serialVersionUID = 1L;
    private long MAX_TIMESTAMP;
    private final long DELEY = 3000;


    @Override
    public long extractTimestamp(Tuple3<String, Long, JSONObject> t, long l) {
        long timestamp = t.f1 ;
        MAX_TIMESTAMP =  Math.max(timestamp , MAX_TIMESTAMP);
        System.out.println("Max TimeStamp : " + MAX_TIMESTAMP);
        return timestamp ;
    }

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        System.out.println("Current WatreMark : " + (MAX_TIMESTAMP - DELEY));
        return new Watermark(MAX_TIMESTAMP - DELEY);
    }
}

In addition, I set the watermark interval to 100 milliseconds:

env.getConfig().setAutoWatermarkInterval(100);

But when I check the logs, some watermarks are -3000, so in
getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 =
-3000), while I can see in the logs that the MAX_TIMESTAMP has a value
greater than zero!
Here is a part of the output:
Max TimeStamp : 1532934243136
Max TimeStamp : 1532934243136
Max TimeStamp : 1532934243144
Max TimeStamp : 1532934243144
Max TimeStamp : 1532934243152
Max TimeStamp : 1532934243152
Max TimeStamp : 1532934243160
Max TimeStamp : 1532934243160
Max TimeStamp : 1532934243168
Max TimeStamp : 1532934243168
Current WatreMark : 1532934240168
Current WatreMark : -3000
Current WatreMark : -3000
Current WatreMark : 1532934240168
Max TimeStamp : 1532934243176
Max TimeStamp : 1532934243176
Max TimeStamp : 1532934243184
Max TimeStamp : 1532934243200
Max TimeStamp : 1532934243208
Max TimeStamp : 1532934243184

Reply via email to