Dear list-members, I have a question regarding window-firing and element accumulation for a slidindingwindow on a DataStream (Flink 1.8.1-2.12).
My DataStream is derived from a custom SourceFunction, which emits stirng-sequences of WINDOW size, in a deterministic sequence. The aim is to crete sliding windows over the keyedstream for processing on the accumulated strings, based on EventTime. To assign EventTime and Watermark, I attech an AssignerWithPeriodicWaterMarks, to the stream. The sliding window is processed with a custom ProcessWindowFunction. env.setStreamTimeCharacteristic(EventTime) val seqStream = env.addSource(Seqstream) .assignTimestampsAndWatermarks(SeqTimeStampExtractor()) .keyBy(getEventtimeKey) .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), Time.milliseconds(slideSize))) val result = seqStream.process(ProcessSeqWindow(target1)) My AssignerWithPeriodicWaterMarks looks like this. class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> { var waterMark = 9999L override fun extractTimestamp(element: FASTAstring, previousElementTimestamp: Long): Long { return element.f1 } override fun getCurrentWatermark(): Watermark? { waterMark += 1 return Watermark(waterMark) } } In other words, each element emitted by the source should have its own EvenTime, and the WaterMark should be emitted allowing no further events for that time. Stepping through the stream in a debugger, indicates that EventTime / Watremarks are generated as would expect. My expectation is that ProcessSeqWindow.run() ought to be called with a number of elements proportional to the time window (e.g. 10 ms), over EventTime. However, what I observe is that run() is called multiple times with single elemnts, and in an arbitrary sequence with respect to EventTime. My question is whether this is likely to be caused by multiple trigger-events on each window, or are there other possible explainations? How can I debug the cause? Thanks, Eric