Using Flink EventTime feature, I implement the class AssignerWithPeriodicWatermark such that:
public static class SampleTimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple3<String, Long, JSONObject>> { private static final long serialVersionUID = 1L; private long MAX_TIMESTAMP; private final long DELEY = 3000; @Override public long extractTimestamp(Tuple3<String, Long, JSONObject> t, long l) { long timestamp = t.f1 ; MAX_TIMESTAMP = Math.max(timestamp , MAX_TIMESTAMP); System.out.println("Max TimeStamp : " + MAX_TIMESTAMP); return timestamp ; } @Nullable @Override public Watermark getCurrentWatermark() { System.out.println("Current WatreMark : " + (MAX_TIMESTAMP - DELEY)); return new Watermark(MAX_TIMESTAMP - DELEY); } } In addition, I set the watermark interval to 100 milliseconds: env.getConfig().setAutoWatermarkInterval(100); But when I check the logs, some watermarks are -3000, so in getCurrentWatermark method, it considers the MAX_TIMESTAMP zero (0 - 3000 = -3000), while I can see in the logs that the MAX_TIMESTAMP has a value greater than zero! Here is a part of the output: Max TimeStamp : 1532934243136 Max TimeStamp : 1532934243136 Max TimeStamp : 1532934243144 Max TimeStamp : 1532934243144 Max TimeStamp : 1532934243152 Max TimeStamp : 1532934243152 Max TimeStamp : 1532934243160 Max TimeStamp : 1532934243160 Max TimeStamp : 1532934243168 Max TimeStamp : 1532934243168 Current WatreMark : 1532934240168 Current WatreMark : -3000 Current WatreMark : -3000 Current WatreMark : 1532934240168 Max TimeStamp : 1532934243176 Max TimeStamp : 1532934243176 Max TimeStamp : 1532934243184 Max TimeStamp : 1532934243200 Max TimeStamp : 1532934243208 Max TimeStamp : 1532934243184