Hi there,

Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and 
watermark. I find there is no example in Scala.  I have a (String, Long) 
Stream, can anyone help implement WatermarkStrategy? I will be really 
gratefully!

val input: DataStream[(String, Long)] = ...
val watermark = input.assignTimestampsAndWatermarks(
    WatermarkStrategy.forGenerator(...)
    )


class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
  final private val maxOutOfOrderness = 60 * 1000

  private var currentMaxTimestamp = 0L

  override def onEvent(event: (String, Long), eventTimestamp: Long, output: 
WatermarkOutput): Unit = {

    currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
  }

  override def onPeriodicEmit(output: WatermarkOutput): Unit = {

    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
  }
}


Reply via email to