Hi there!

I have a Flink Job (v 1.13.2, AWS managed) which reads from Kinesis (AWS 
manger, 4 shards).

For reasons the shards are not partitioned properly (at the moment). So I 
wanted to make use of Watermarks (BoundedOutOfOrdernessTimestampExtractor) and 
the JobManagerWatermarkTracker to avoid skews in the sources. The job is 
running with parallelism 4.

I added the Tracker as followed:

JobManagerWatermarkTracker watermarkTracker = new 
JobManagerWatermarkTracker("watermark-tracker-" + sourceName);
consumer.setWatermarkTracker(watermarkTracker);

I have implemented a naive map function to track latest consumed event 
timestamp (per parallelism) with the Flink metrics:

public class MetricsMapper extends RichMapFunction<Event, Event> {

  private transient Long latestEventTimestamp = 0L;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .addGroup("kinesisanalytics")
      .addGroup("Function", this.getClass().getName())
      .gauge("latestEventTimestamp", (Gauge<Long>) () -> latestEventTimestamp);
  }

  @Override
  public Event map(Event e) throws Exception {
    this.latestEventTimestamp = e.getTimestamp(); // this is the same timestamp 
as used in implementation of BoundedOutOfOrdernessTimestampExtractor
    return e;
  }
}

Using this metrics I can see that there is a skew of roughly 1 sec among my 
shards. I even tried do reduce ConsumerConfigConstants.WATERMARK_SYNC_MILLI to 
100 ms. But this did not have any impact in the skew of the event timestamp. In 
fact, the monitored skew is the same when not using the watermark tracker.

Am I using the watermark tracker wrong? Or is there even sth wrong with my 
naive monitoring?

Help and suggestions welcome. 

Best,
Peter

Reply via email to