Hey Asimansu, The inputDataStream is a KeyedStream, I forgot to mention that.
Caio On Wed, Mar 20, 2024 at 6:56 PM Asimansu Bera <asimansu.b...@gmail.com> wrote: > Hello Caio, > > Based on the pseudocode, there is no keyed function present. Hence, the > window will not be processed parallely . Please check again and respond > back. > > val windowDataStream = > > inputDataStream > > .window(TumblingEventTimeWindows of 1 hour) > > .trigger(custom trigger) > > .aggregate( > > preAggregator = custom AggregateFunction, > > windowFunction = custom ProcessWindowFunction > > ) > > > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows > > -A > > > On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user < > user@flink.apache.org> wrote: > >> We run a large-scale Flink 1.16 cluster that uses windowed aggregations >> and we’re seeing lag spikes on window closure. I’m curious if others have >> encountered similar issues before and if anyone has suggestions for how to >> tackle this problem (other than simply increasing parallelism). >> ContextLag definition >> >> We define end-to-end lag as the delta between the time when the event >> was persisted in Kafka and the time when Flink finishes processing the >> event. >> Window operator definition >> >> The important parts (in pseudocode): >> >> val windowDataStream = >> >> inputDataStream >> >> .window(TumblingEventTimeWindows of 1 hour) >> >> .trigger(custom trigger) >> >> .aggregate( >> >> preAggregator = custom AggregateFunction, >> >> windowFunction = custom ProcessWindowFunction >> >> ) >> >> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. >> we don’t run any user-defined logic at the end of the window. (This trigger >> only fires while the window is active via custom logic in onElement.) >> Numbers >> >> Our Flink app processes ~3K events per second and I’ve calculated that >> there are around 200-300K panes to close per Task at the end of the 1-hour >> window. Our lag is fairly stable at a few hundred milliseconds during >> the window but spikes to 5-10 seconds when the window expires, which is a >> problem for us. >> The issue >> >> The magnitude of the lag spikes on window closure correlate with >> >> - >> >> the size of the window (a 1-hour window has bigger spikes than a >> 5-minute window.) >> - >> >> the cardinality of the keys in the event stream. >> - >> >> the number of events being processed per second. >> >> In other words, the more panes to close, the bigger the lag spike. I'm >> fairly sure that the lag is coming entirely from the WindowOperator’s >> clearAllState and I’ve validated that CPU profiles show clearAllState >> using a significant amount of CPU. >> >> Does this theory sound plausible? What could we do to minimize the >> effects of window clean-up? It would be nice to do it incrementally or >> asynchronously but I'm not sure if Flink provides this functionality today. >> Thanks, >> Caio >> >