Hi there,
Flink 1.11 comes with the new WatermarkStrategy API to assign timestamp and
watermark. I find there is no example in Scala. I have a (String, Long)
Stream, can anyone help implement WatermarkStrategy? I will be really
gratefully!
val input: DataStream[(String, Long)] = ...
val watermark = input.assignTimestampsAndWatermarks(
WatermarkStrategy.forGenerator(...)
)
class MyPeriodicGenerator extends WatermarkGenerator[(String, Long)] {
final private val maxOutOfOrderness = 60 * 1000
private var currentMaxTimestamp = 0L
override def onEvent(event: (String, Long), eventTimestamp: Long, output:
WatermarkOutput): Unit = {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp)
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness))
}
}