I didn't try to reproduce it locally since this job reads 14K events per
second.
I am using Flink version 1.12.1 and RocksDB state backend. It also happens
with Flink 1.10.

I tried to profile with JVisualVM and I didn't see any bottleneck. All the
user functions almost didn't take any CPU time.

On Wed, Oct 20, 2021 at 6:50 PM Timo Walther <twal...@apache.org> wrote:

> Hi Ori,
>
> this sounds indeed strange. Can you also reproduce this behavior locally
> with a faker source? We should definitely add a profiler and see where
> the bottleneck lies.
>
> Which Flink version and state backend are you using?
>
> Regards,
> Timo
>
> On 20.10.21 16:17, Ori Popowski wrote:
> > I have a simple Flink application with a simple keyBy, a SessionWindow,
> > and I use an AggregateFunction to incrementally aggregate a result, and
> > write to a Sink.
> >
> > Some of the requirements involve accumulating lists of fields from the
> > events (for example, all URLs), so not all the values in the end should
> > be primitives (although some are, like total number of events, and
> > session duration).
> >
> > This job is experiencing a huge backpressure 40 minutes after launching.
> >
> > I've found out that the append and concatenate operations in the logic
> > of my AggregateFunction's add() and merge() functions are what's ruining
> > the job (i.e. causing the backpressure).
> >
> > I've managed to create a reduced version of my job, where I just append
> > and concatenate some of the event values and I can confirm that a
> > backpressure starts just 40 minutes after launching the job:
> >
> > class SimpleAggregator extends AggregateFunction[Event, Accumulator,
> > Session] with LazyLogging {
> >
> > override def createAccumulator(): Accumulator = (
> > Vector.empty,
> > Vector.empty,
> > Vector.empty,
> > Vector.empty,
> > Vector.empty
> > )
> >
> > override def add(value: Event, accumulator: Accumulator): Accumulator = {
> > (
> > accumulator._1 :+ value.getEnvUrl,
> > accumulator._2 :+ value.getCtxVisitId,
> > accumulator._3 :+ value.getVisionsSId,
> > accumulator._4 :+ value.getTime.longValue(),
> > accumulator._5 :+ value.getTime.longValue()
> > )
> > }
> >
> > override def merge(a: Accumulator, b: Accumulator): Accumulator = {
> > (
> > a._1 ++ b._1,
> > a._2 ++ b._2,
> > a._3 ++ b._3,
> > a._4 ++ b._4,
> > a._5 ++ b._5
> > )
> > }
> >
> > override def getResult(accumulator: Accumulator): Session = {
> > Session.newBuilder()
> > .setSessionDuration(1000)
> > .setSessionTotalEvents(1000)
> > .setSId("-" + UUID.randomUUID().toString)
> > .build()
> > }
> > }
> >
> >
> > This is the job overall (simplified version):
> >
> > class App(
> > source: SourceFunction[Event],
> > sink: SinkFunction[Session]
> > ) {
> >
> > def run(config: Config): Unit = {
> > val senv = StreamExecutionEnvironment.getExecutionEnvironment
> > senv.setMaxParallelism(256)
> > val dataStream = senv.addSource(source).uid("source")
> > dataStream
> > .assignAscendingTimestamps(_.getTime)
> > .keyBy(event => (event.getWmUId, event.getWmEnv,
> event.getSId).toString())
> > .window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
> > .allowedLateness(0.seconds.asFlinkTime)
> > .process(new ProcessFunction).uid("process-session")
> > .addSink(sink).uid("sink")
> >
> > senv.execute("session-aggregation")
> > }
> > }
> >
> >
> > After 3 weeks of grueling debugging, profiling, checking the
> > serialization and more I couldn't solve the backpressure issue.
> > However, I got an idea and used Flink's ProcessWindowFunction which just
> > aggregates all the events behind the scenes and just gives them to me as
> > an iterator, where I can then do all my calculations.
> > Surprisingly, there's no backpressure. So even though the
> > ProcessWindowFunction actually aggregates more data, and also does
> > concatenations and appends, for some reason there's no backpressure.
> >
> > To finish this long post, what I'm trying to understand here is why when
> > I collected the events using an AggregateFunction there was a
> > backpressure, and when Flink does this for me with ProcessWindowFunction
> > there's no backpressure? It seems to me something is fundamentally wrong
> > here, since it means I cannot do any non-reducing operations without
> > creating backpressure. I think it shouldn't cause the backpressure I
> > experienced. I'm trying to understand what I did wrong here.
> >
> > Thanks!
>
>

Reply via email to