We run a large-scale Flink 1.16 cluster that uses windowed aggregations and we’re seeing lag spikes on window closure. I’m curious if others have encountered similar issues before and if anyone has suggestions for how to tackle this problem (other than simply increasing parallelism). ContextLag definition
We define end-to-end lag as the delta between the time when the event was persisted in Kafka and the time when Flink finishes processing the event. Window operator definition The important parts (in pseudocode): val windowDataStream = inputDataStream .window(TumblingEventTimeWindows of 1 hour) .trigger(custom trigger) .aggregate( preAggregator = custom AggregateFunction, windowFunction = custom ProcessWindowFunction ) The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. we don’t run any user-defined logic at the end of the window. (This trigger only fires while the window is active via custom logic in onElement.) Numbers Our Flink app processes ~3K events per second and I’ve calculated that there are around 200-300K panes to close per Task at the end of the 1-hour window. Our lag is fairly stable at a few hundred milliseconds during the window but spikes to 5-10 seconds when the window expires, which is a problem for us. The issue The magnitude of the lag spikes on window closure correlate with - the size of the window (a 1-hour window has bigger spikes than a 5-minute window.) - the cardinality of the keys in the event stream. - the number of events being processed per second. In other words, the more panes to close, the bigger the lag spike. I'm fairly sure that the lag is coming entirely from the WindowOperator’s clearAllState and I’ve validated that CPU profiles show clearAllState using a significant amount of CPU. Does this theory sound plausible? What could we do to minimize the effects of window clean-up? It would be nice to do it incrementally or asynchronously but I'm not sure if Flink provides this functionality today. Thanks, Caio