Thanks for taking the time to answer this. - You're correct that the SimpleAggregator is not used in the job setup. I didn't copy the correct piece of code. - I understand the overhead involved. But I do not agree with the O(n^2) complexity. Are you implying that Vector append is O(n) by itself? - I understand your points regarding ProcessFunction except for the "without touching the previously stored event". Also with AggregateFunction + concatenation I don't touch the elements other than the new element. I forgot to mention by the way, that the issue reproduces also with Lists which should be much faster for appends and concats.
Could overhead by itself account for the backpressure? >From this job the only conclusion is that Flink just cannot do aggregating operations which collect values, only simple operations which produce a scalar values (like sum/avg). It seems weird to me Flink would be so limited in such way. On Wed, Oct 20, 2021 at 7:03 PM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Hi Ori, > > > > Just a couple of comments (some code is missing for a concise explanation): > > - SimpleAggregator is not used in the job setup below (assuming > another job setup) > - SimpleAggregator is called for each event that goes into a specific > session window, however > - The scala vectors will ever grow with the number of events that > end up in a single window, hence > - Your BigO complexity will be O(n^2), n: number of events in > window (or worse) > - For each event the accumulator is retrieved from window state and > stored to window state (and serialized, if on RocksDB Backend) > - On the other hand when you use a process function > - Flink keeps a list state of events belonging to the session > window, and > - Only when the window is triggered (on session gap timeout) all > events are retrieved from window state and processed > - On RocksDbBackend the new events added to the window are appended > to the existing window state key without touching the previously stored > events, hence > - Serialization is only done once per incoming event, and > - BigO complexity is around O(n) > > > > … much simplified > > > > When I started with similar questions I spent quite some time in the > debugger, breaking into the windowing functions and going up the call > stack, in order to understand how Flink works … time well spent > > > > > > I hope this helps … > > > > I won’t be able to follow up for the next 1 ½ weeks, unless you try to > meet me on FlinkForward conference … > > > > Thias > > > > *From:* Ori Popowski <ori....@gmail.com> > *Sent:* Mittwoch, 20. Oktober 2021 16:17 > *To:* user <user@flink.apache.org> > *Subject:* Huge backpressure when using AggregateFunction with Session > Window > > > > I have a simple Flink application with a simple keyBy, a SessionWindow, > and I use an AggregateFunction to incrementally aggregate a result, and > write to a Sink. > > > > Some of the requirements involve accumulating lists of fields from the > events (for example, all URLs), so not all the values in the end should be > primitives (although some are, like total number of events, and session > duration). > > > > This job is experiencing a huge backpressure 40 minutes after launching. > > > > I've found out that the append and concatenate operations in the logic of > my AggregateFunction's add() and merge() functions are what's ruining the > job (i.e. causing the backpressure). > > > > I've managed to create a reduced version of my job, where I just append > and concatenate some of the event values and I can confirm that a > backpressure starts just 40 minutes after launching the job: > > > > * class *SimpleAggregator *extends *AggregateFunction[Event, Accumulator, > Session] *with *LazyLogging { > > *override def *createAccumulator(): Accumulator = ( > *Vector*.*empty*, > * Vector*.*empty*, > * Vector*.*empty*, > * Vector*.*empty*, > * Vector*. > *empty * ) > > * override def *add(value: Event, accumulator: Accumulator): Accumulator > = { > ( > accumulator._1 :+ value.getEnvUrl, > accumulator._2 :+ value.getCtxVisitId, > accumulator._3 :+ value.getVisionsSId, > accumulator._4 :+ value.getTime.longValue(), > accumulator._5 :+ value.getTime.longValue() > ) > } > > * override def *merge(a: Accumulator, b: Accumulator): Accumulator = { > ( > a._1 ++ b._1, > a._2 ++ b._2, > a._3 ++ b._3, > a._4 ++ b._4, > a._5 ++ b._5 > ) > } > > * override def *getResult(accumulator: Accumulator): Session = { > Session.*newBuilder*() > .setSessionDuration(1000) > .setSessionTotalEvents(1000) > .setSId(*"-" *+ UUID.*randomUUID*().toString) > .build() > } > } > > > > This is the job overall (simplified version): > > > > * class *App( > source: SourceFunction[Event], > sink: SinkFunction[Session] > ) { > > *def *run(config: Config): Unit = { > *val *senv = StreamExecutionEnvironment. > *getExecutionEnvironment * senv.setMaxParallelism(256) > *val *dataStream = senv.addSource(source).uid(*"source"*) > dataStream > .assignAscendingTimestamps(_.getTime) > .keyBy(event => (event.getWmUId, event.getWmEnv, > event.getSId).toString()) > > .window(EventTimeSessionWindows.*withGap*(config.sessionGap.asFlinkTime)) > .allowedLateness(0.seconds.asFlinkTime) > .process(*new *ProcessFunction).uid(*"process-session"*) > .addSink(sink).uid(*"sink"*) > > senv.execute(*"session-aggregation"*) > } > } > > > > After 3 weeks of grueling debugging, profiling, checking the serialization > and more I couldn't solve the backpressure issue. > > However, I got an idea and used Flink's ProcessWindowFunction which just > aggregates all the events behind the scenes and just gives them to me as an > iterator, where I can then do all my calculations. > > Surprisingly, there's no backpressure. So even though the > ProcessWindowFunction actually aggregates more data, and also does > concatenations and appends, for some reason there's no backpressure. > > > > To finish this long post, what I'm trying to understand here is why when I > collected the events using an AggregateFunction there was a backpressure, > and when Flink does this for me with ProcessWindowFunction there's no > backpressure? It seems to me something is fundamentally wrong here, since > it means I cannot do any non-reducing operations without creating > backpressure. I think it shouldn't cause the backpressure I experienced. > I'm trying to understand what I did wrong here. > > > > Thanks! > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >