I didn't try to reproduce it locally since this job reads 14K events per second. I am using Flink version 1.12.1 and RocksDB state backend. It also happens with Flink 1.10.
I tried to profile with JVisualVM and I didn't see any bottleneck. All the user functions almost didn't take any CPU time. On Wed, Oct 20, 2021 at 6:50 PM Timo Walther <twal...@apache.org> wrote: > Hi Ori, > > this sounds indeed strange. Can you also reproduce this behavior locally > with a faker source? We should definitely add a profiler and see where > the bottleneck lies. > > Which Flink version and state backend are you using? > > Regards, > Timo > > On 20.10.21 16:17, Ori Popowski wrote: > > I have a simple Flink application with a simple keyBy, a SessionWindow, > > and I use an AggregateFunction to incrementally aggregate a result, and > > write to a Sink. > > > > Some of the requirements involve accumulating lists of fields from the > > events (for example, all URLs), so not all the values in the end should > > be primitives (although some are, like total number of events, and > > session duration). > > > > This job is experiencing a huge backpressure 40 minutes after launching. > > > > I've found out that the append and concatenate operations in the logic > > of my AggregateFunction's add() and merge() functions are what's ruining > > the job (i.e. causing the backpressure). > > > > I've managed to create a reduced version of my job, where I just append > > and concatenate some of the event values and I can confirm that a > > backpressure starts just 40 minutes after launching the job: > > > > class SimpleAggregator extends AggregateFunction[Event, Accumulator, > > Session] with LazyLogging { > > > > override def createAccumulator(): Accumulator = ( > > Vector.empty, > > Vector.empty, > > Vector.empty, > > Vector.empty, > > Vector.empty > > ) > > > > override def add(value: Event, accumulator: Accumulator): Accumulator = { > > ( > > accumulator._1 :+ value.getEnvUrl, > > accumulator._2 :+ value.getCtxVisitId, > > accumulator._3 :+ value.getVisionsSId, > > accumulator._4 :+ value.getTime.longValue(), > > accumulator._5 :+ value.getTime.longValue() > > ) > > } > > > > override def merge(a: Accumulator, b: Accumulator): Accumulator = { > > ( > > a._1 ++ b._1, > > a._2 ++ b._2, > > a._3 ++ b._3, > > a._4 ++ b._4, > > a._5 ++ b._5 > > ) > > } > > > > override def getResult(accumulator: Accumulator): Session = { > > Session.newBuilder() > > .setSessionDuration(1000) > > .setSessionTotalEvents(1000) > > .setSId("-" + UUID.randomUUID().toString) > > .build() > > } > > } > > > > > > This is the job overall (simplified version): > > > > class App( > > source: SourceFunction[Event], > > sink: SinkFunction[Session] > > ) { > > > > def run(config: Config): Unit = { > > val senv = StreamExecutionEnvironment.getExecutionEnvironment > > senv.setMaxParallelism(256) > > val dataStream = senv.addSource(source).uid("source") > > dataStream > > .assignAscendingTimestamps(_.getTime) > > .keyBy(event => (event.getWmUId, event.getWmEnv, > event.getSId).toString()) > > .window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime)) > > .allowedLateness(0.seconds.asFlinkTime) > > .process(new ProcessFunction).uid("process-session") > > .addSink(sink).uid("sink") > > > > senv.execute("session-aggregation") > > } > > } > > > > > > After 3 weeks of grueling debugging, profiling, checking the > > serialization and more I couldn't solve the backpressure issue. > > However, I got an idea and used Flink's ProcessWindowFunction which just > > aggregates all the events behind the scenes and just gives them to me as > > an iterator, where I can then do all my calculations. > > Surprisingly, there's no backpressure. So even though the > > ProcessWindowFunction actually aggregates more data, and also does > > concatenations and appends, for some reason there's no backpressure. > > > > To finish this long post, what I'm trying to understand here is why when > > I collected the events using an AggregateFunction there was a > > backpressure, and when Flink does this for me with ProcessWindowFunction > > there's no backpressure? It seems to me something is fundamentally wrong > > here, since it means I cannot do any non-reducing operations without > > creating backpressure. I think it shouldn't cause the backpressure I > > experienced. I'm trying to understand what I did wrong here. > > > > Thanks! > >