Hey Caio, Your analysis of the problem sounds right to me, I don't have a good solution for you :(
I’ve validated that CPU profiles show clearAllState using a significant > amount of CPU. Did you use something like async-profiler here? Do you have more info on the breakdown into what used the CPU time? Once we know that, there might be an opportunity to do such operations async/lazily, or fix something with the underlying platform (e.g. Rocksdb is slow, ...) On Thu, Mar 21, 2024 at 12:05 AM Caio Camatta via user < user@flink.apache.org> wrote: > Hey Asimansu, > > The inputDataStream is a KeyedStream, I forgot to mention that. > > Caio > > On Wed, Mar 20, 2024 at 6:56 PM Asimansu Bera <asimansu.b...@gmail.com> > wrote: > >> Hello Caio, >> >> Based on the pseudocode, there is no keyed function present. Hence, the >> window will not be processed parallely . Please check again and respond >> back. >> >> val windowDataStream = >> >> inputDataStream >> >> .window(TumblingEventTimeWindows of 1 hour) >> >> .trigger(custom trigger) >> >> .aggregate( >> >> preAggregator = custom AggregateFunction, >> >> windowFunction = custom ProcessWindowFunction >> >> ) >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows >> >> -A >> >> >> On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user < >> user@flink.apache.org> wrote: >> >>> We run a large-scale Flink 1.16 cluster that uses windowed aggregations >>> and we’re seeing lag spikes on window closure. I’m curious if others have >>> encountered similar issues before and if anyone has suggestions for how to >>> tackle this problem (other than simply increasing parallelism). >>> ContextLag definition >>> >>> We define end-to-end lag as the delta between the time when the event >>> was persisted in Kafka and the time when Flink finishes processing the >>> event. >>> Window operator definition >>> >>> The important parts (in pseudocode): >>> >>> val windowDataStream = >>> >>> inputDataStream >>> >>> .window(TumblingEventTimeWindows of 1 hour) >>> >>> .trigger(custom trigger) >>> >>> .aggregate( >>> >>> preAggregator = custom AggregateFunction, >>> >>> windowFunction = custom ProcessWindowFunction >>> >>> ) >>> >>> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. >>> we don’t run any user-defined logic at the end of the window. (This trigger >>> only fires while the window is active via custom logic in onElement.) >>> Numbers >>> >>> Our Flink app processes ~3K events per second and I’ve calculated that >>> there are around 200-300K panes to close per Task at the end of the 1-hour >>> window. Our lag is fairly stable at a few hundred milliseconds during >>> the window but spikes to 5-10 seconds when the window expires, which is a >>> problem for us. >>> The issue >>> >>> The magnitude of the lag spikes on window closure correlate with >>> >>> - >>> >>> the size of the window (a 1-hour window has bigger spikes than a >>> 5-minute window.) >>> - >>> >>> the cardinality of the keys in the event stream. >>> - >>> >>> the number of events being processed per second. >>> >>> In other words, the more panes to close, the bigger the lag spike. I'm >>> fairly sure that the lag is coming entirely from the WindowOperator’s >>> clearAllState and I’ve validated that CPU profiles show clearAllState >>> using a significant amount of CPU. >>> >>> Does this theory sound plausible? What could we do to minimize the >>> effects of window clean-up? It would be nice to do it incrementally or >>> asynchronously but I'm not sure if Flink provides this functionality today. >>> Thanks, >>> Caio >>> >>