Hi, I have recently experimented a bit with windowing and event-time mechanism in flink and either I do not understand how should it work or there is some kind of a bug.
I have prepared two Source Functions. One that emits watermark itself and one that does not, but I have prepared a TimestampExtractor that should produce same results that the previous Source Function, at least from my point of view. Afterwards I've prepared a simple summing over an EventTimeTriggered Sliding Window. What I expected is a sum of 3*(t_sum) property of Event regardless of the sleep time in Source Function. That is how the EventTimeSourceFunction works, but for the SourceFunction it depends on the sleep and does not equals 3*(t_sum). I have done some debugging and for the SourceFunction the output of ExtractTimestampsOperator does not chain to the aggregator operator(the property output.allOutputs is empty). Do I understand the mechanism correctly and should my code work as I described? If not could you please explain a little bit? The code I've attached to this email. I would be grateful. Regards Dawid Wysakowicz
import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.TimestampExtractor import org.apache.flink.streaming.api.functions.source.{SourceFunction, EventTimeSourceFunction} import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger object FlinkWindows { case class Event(name: String, toSum: Int, timestamp: Long) private var hostName: String = null private var port: Int = 0 def main(args: Array[String]) { hostName = args(0) port = args(1).toInt val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // env.getConfig.setAutoWatermarkInterval(0) //Read from a socket stream at map it to StockPrice objects // val stream = env.socketTextStream(hostName, port).map(x => { // val split = x.split(",") // Event(split(0), split(1).toInt, split(2).toLong) // }) val stream = env.addSource(new EventSourceWithTimestamp) // val stream = env.fromCollection(genCarStream()) .keyBy("name") stream.assignTimestamps(new TimestampExtractor[Event] { private var currentWatermark: Long = Long.MinValue override def getCurrentWatermark: Long = currentWatermark override def extractWatermark(element: Event, currentTimestamp: Long): Long = { if (element.timestamp > currentWatermark) { currentWatermark = element.timestamp - 999 } currentWatermark } override def extractTimestamp(element: Event, currentTimestamp: Long): Long = element.timestamp }) val sumed = stream .window(SlidingTimeWindows.of( Time.of(3, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS) )) .trigger(EventTimeTrigger.create()) .sum("toSum") .print() env.execute("Stock stream") } class EventSource extends SourceFunction[Event] { var isRunning = true val offset = System.currentTimeMillis() override def cancel(): Unit = isRunning = false override def run(ctx: SourceContext[Event]): Unit = { var idx = 0 while (isRunning) { Thread.sleep(1500) ctx.collect(Event("a", 3, offset + idx * 999)) ctx.collect(Event("b", 2, offset + idx * 999)) idx += 1 } } } class EventSourceWithTimestamp extends EventTimeSourceFunction[Event] { var isRunning = true val offset = System.currentTimeMillis() override def cancel(): Unit = isRunning = false override def run(ctx: SourceContext[Event]): Unit = { var idx = 0 while (isRunning) { Thread.sleep(1500) ctx.collectWithTimestamp(Event("a", 3, offset + idx * 999), offset + idx * 999) ctx.collectWithTimestamp(Event("b", 2, offset + idx * 999), offset + idx * 999) ctx.emitWatermark(new Watermark(offset + (idx - 1) * 999)) idx += 1 } } } }