Hey guys, I have a KeyedProcessFunction that gathers statistics on the events that flow through and emits it periodically (every few seconds) to a SideOutput. However, at the end of stream the last set of statistics don't get emitted. I read on the mailing list that processing time timers that are pending don't get triggered when Flink cleans up a stream, but that event timers do get triggered because a watermark with Long.MAX_VALUE is sent through the stream. Hence, I thought that I could register a "backup" event timer for Long.MAX_VALUE-1 to make sure that my process function gets notified when the stream ends to emit the in-flight statistics.
However, now my simple test case (with a data source fromCollection of 4 elements) keeps iterating over the same 4 elements in an infinite loop. I don't know how to make sense of this and would appreciate your help. Is there a better way to set a timer that gets triggered at the end of stream? And for my education: Why does registering an event timer cause an infinite loop over the source elements? Thanks a lot and have a wonderful weekend, Matthias