Hi there! I have a Flink Job (v 1.13.2, AWS managed) which reads from Kinesis (AWS manger, 4 shards).
For reasons the shards are not partitioned properly (at the moment). So I wanted to make use of Watermarks (BoundedOutOfOrdernessTimestampExtractor) and the JobManagerWatermarkTracker to avoid skews in the sources. The job is running with parallelism 4. I added the Tracker as followed: JobManagerWatermarkTracker watermarkTracker = new JobManagerWatermarkTracker("watermark-tracker-" + sourceName); consumer.setWatermarkTracker(watermarkTracker); I have implemented a naive map function to track latest consumed event timestamp (per parallelism) with the Flink metrics: public class MetricsMapper extends RichMapFunction<Event, Event> { private transient Long latestEventTimestamp = 0L; @Override public void open(Configuration config) { getRuntimeContext() .getMetricGroup() .addGroup("kinesisanalytics") .addGroup("Function", this.getClass().getName()) .gauge("latestEventTimestamp", (Gauge<Long>) () -> latestEventTimestamp); } @Override public Event map(Event e) throws Exception { this.latestEventTimestamp = e.getTimestamp(); // this is the same timestamp as used in implementation of BoundedOutOfOrdernessTimestampExtractor return e; } } Using this metrics I can see that there is a skew of roughly 1 sec among my shards. I even tried do reduce ConsumerConfigConstants.WATERMARK_SYNC_MILLI to 100 ms. But this did not have any impact in the skew of the event timestamp. In fact, the monitored skew is the same when not using the watermark tracker. Am I using the watermark tracker wrong? Or is there even sth wrong with my naive monitoring? Help and suggestions welcome. Best, Peter