Thanks for taking the time to answer this.

   - You're correct that the SimpleAggregator is not used in the job setup.
   I didn't copy the correct piece of code.
   - I understand the overhead involved. But I do not agree with the O(n^2)
   complexity. Are you implying that Vector append is O(n) by itself?
   - I understand your points regarding ProcessFunction except for the "without
   touching the previously stored event". Also with AggregateFunction +
   concatenation I don't touch the elements other than the new element. I
   forgot to mention by the way, that the issue reproduces also with Lists
   which should be much faster for appends and concats.

Could overhead by itself account for the backpressure?
>From this job the only conclusion is that Flink just cannot do aggregating
operations which collect values, only simple operations which produce a
scalar values (like sum/avg). It seems weird to me Flink would be so
limited in such way.



On Wed, Oct 20, 2021 at 7:03 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi Ori,
>
>
>
> Just a couple of comments (some code is missing for a concise explanation):
>
>    - SimpleAggregator is not used in the job setup below (assuming
>    another job setup)
>    - SimpleAggregator is called for each event that goes into a specific
>    session window, however
>       - The scala vectors will ever grow with the number of events that
>       end up in a single window, hence
>       - Your BigO complexity will be O(n^2), n: number of events in
>       window (or worse)
>       - For each event the accumulator is retrieved from window state and
>       stored to window state (and serialized, if on RocksDB Backend)
>    - On the other hand when you use a process function
>       - Flink keeps a list state of events belonging to the session
>       window, and
>       - Only when the window is triggered (on session gap timeout) all
>       events are retrieved from window state and processed
>       - On RocksDbBackend the new events added to the window are appended
>       to the existing window state key without touching the previously stored
>       events, hence
>       - Serialization is only done once per incoming event, and
>       - BigO complexity is around O(n)
>
>
>
> … much simplified
>
>
>
> When I started with similar questions I spent quite some time in the
> debugger, breaking into the windowing functions and going up the call
> stack, in order to understand how Flink works … time well spent
>
>
>
>
>
> I hope this helps …
>
>
>
> I won’t be able to follow up for the next 1 ½ weeks, unless you try to
> meet me on FlinkForward conference …
>
>
>
> Thias
>
>
>
> *From:* Ori Popowski <ori....@gmail.com>
> *Sent:* Mittwoch, 20. Oktober 2021 16:17
> *To:* user <user@flink.apache.org>
> *Subject:* Huge backpressure when using AggregateFunction with Session
> Window
>
>
>
> I have a simple Flink application with a simple keyBy, a SessionWindow,
> and I use an AggregateFunction to incrementally aggregate a result, and
> write to a Sink.
>
>
>
> Some of the requirements involve accumulating lists of fields from the
> events (for example, all URLs), so not all the values in the end should be
> primitives (although some are, like total number of events, and session
> duration).
>
>
>
> This job is experiencing a huge backpressure 40 minutes after launching.
>
>
>
> I've found out that the append and concatenate operations in the logic of
> my AggregateFunction's add() and merge() functions are what's ruining the
> job (i.e. causing the backpressure).
>
>
>
> I've managed to create a reduced version of my job, where I just append
> and concatenate some of the event values and I can confirm that a
> backpressure starts just 40 minutes after launching the job:
>
>
>
> *    class *SimpleAggregator *extends *AggregateFunction[Event, Accumulator, 
> Session] *with *LazyLogging {
>
>       *override def *createAccumulator(): Accumulator = (
>         *Vector*.*empty*,
>     *    Vector*.*empty*,
>     *    Vector*.*empty*,
>     *    Vector*.*empty*,
>     *    Vector*.
> *empty  *    )
>
>   *    override def *add(value: Event, accumulator: Accumulator): Accumulator 
> = {
>         (
>           accumulator._1 :+ value.getEnvUrl,
>           accumulator._2 :+ value.getCtxVisitId,
>           accumulator._3 :+ value.getVisionsSId,
>           accumulator._4 :+ value.getTime.longValue(),
>           accumulator._5 :+ value.getTime.longValue()
>         )
>       }
>
>   *    override def *merge(a: Accumulator, b: Accumulator): Accumulator = {
>         (
>           a._1 ++ b._1,
>           a._2 ++ b._2,
>           a._3 ++ b._3,
>           a._4 ++ b._4,
>           a._5 ++ b._5
>         )
>       }
>
>   *    override def *getResult(accumulator: Accumulator): Session = {
>         Session.*newBuilder*()
>           .setSessionDuration(1000)
>           .setSessionTotalEvents(1000)
>           .setSId(*"-" *+ UUID.*randomUUID*().toString)
>           .build()
>       }
>     }
>
>
>
> This is the job overall (simplified version):
>
>
>
> *    class *App(
>       source: SourceFunction[Event],
>       sink: SinkFunction[Session]
>     ) {
>
>       *def *run(config: Config): Unit = {
>         *val *senv = StreamExecutionEnvironment.
> *getExecutionEnvironment    *    senv.setMaxParallelism(256)
>         *val *dataStream = senv.addSource(source).uid(*"source"*)
>         dataStream
>           .assignAscendingTimestamps(_.getTime)
>           .keyBy(event => (event.getWmUId, event.getWmEnv, 
> event.getSId).toString())
>           
> .window(EventTimeSessionWindows.*withGap*(config.sessionGap.asFlinkTime))
>           .allowedLateness(0.seconds.asFlinkTime)
>           .process(*new *ProcessFunction).uid(*"process-session"*)
>           .addSink(sink).uid(*"sink"*)
>
>         senv.execute(*"session-aggregation"*)
>       }
>     }
>
>
>
> After 3 weeks of grueling debugging, profiling, checking the serialization
> and more I couldn't solve the backpressure issue.
>
> However, I got an idea and used Flink's ProcessWindowFunction which just
> aggregates all the events behind the scenes and just gives them to me as an
> iterator, where I can then do all my calculations.
>
> Surprisingly, there's no backpressure. So even though the
> ProcessWindowFunction actually aggregates more data, and also does
> concatenations and appends, for some reason there's no backpressure.
>
>
>
> To finish this long post, what I'm trying to understand here is why when I
> collected the events using an AggregateFunction there was a backpressure,
> and when Flink does this for me with ProcessWindowFunction there's no
> backpressure? It seems to me something is fundamentally wrong here, since
> it means I cannot do any non-reducing operations without creating
> backpressure. I think it shouldn't cause the backpressure I experienced.
> I'm trying to understand what I did wrong here.
>
>
>
> Thanks!
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to