Hi Ori,
this sounds indeed strange. Can you also reproduce this behavior locally
with a faker source? We should definitely add a profiler and see where
the bottleneck lies.
Which Flink version and state backend are you using?
Regards,
Timo
On 20.10.21 16:17, Ori Popowski wrote:
I have a simple Flink application with a simple keyBy, a SessionWindow,
and I use an AggregateFunction to incrementally aggregate a result, and
write to a Sink.
Some of the requirements involve accumulating lists of fields from the
events (for example, all URLs), so not all the values in the end should
be primitives (although some are, like total number of events, and
session duration).
This job is experiencing a huge backpressure 40 minutes after launching.
I've found out that the append and concatenate operations in the logic
of my AggregateFunction's add() and merge() functions are what's ruining
the job (i.e. causing the backpressure).
I've managed to create a reduced version of my job, where I just append
and concatenate some of the event values and I can confirm that a
backpressure starts just 40 minutes after launching the job:
class SimpleAggregator extends AggregateFunction[Event, Accumulator,
Session] with LazyLogging {
override def createAccumulator(): Accumulator = (
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty,
Vector.empty
)
override def add(value: Event, accumulator: Accumulator): Accumulator = {
(
accumulator._1 :+ value.getEnvUrl,
accumulator._2 :+ value.getCtxVisitId,
accumulator._3 :+ value.getVisionsSId,
accumulator._4 :+ value.getTime.longValue(),
accumulator._5 :+ value.getTime.longValue()
)
}
override def merge(a: Accumulator, b: Accumulator): Accumulator = {
(
a._1 ++ b._1,
a._2 ++ b._2,
a._3 ++ b._3,
a._4 ++ b._4,
a._5 ++ b._5
)
}
override def getResult(accumulator: Accumulator): Session = {
Session.newBuilder()
.setSessionDuration(1000)
.setSessionTotalEvents(1000)
.setSId("-" + UUID.randomUUID().toString)
.build()
}
}
This is the job overall (simplified version):
class App(
source: SourceFunction[Event],
sink: SinkFunction[Session]
) {
def run(config: Config): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setMaxParallelism(256)
val dataStream = senv.addSource(source).uid("source")
dataStream
.assignAscendingTimestamps(_.getTime)
.keyBy(event => (event.getWmUId, event.getWmEnv, event.getSId).toString())
.window(EventTimeSessionWindows.withGap(config.sessionGap.asFlinkTime))
.allowedLateness(0.seconds.asFlinkTime)
.process(new ProcessFunction).uid("process-session")
.addSink(sink).uid("sink")
senv.execute("session-aggregation")
}
}
After 3 weeks of grueling debugging, profiling, checking the
serialization and more I couldn't solve the backpressure issue.
However, I got an idea and used Flink's ProcessWindowFunction which just
aggregates all the events behind the scenes and just gives them to me as
an iterator, where I can then do all my calculations.
Surprisingly, there's no backpressure. So even though the
ProcessWindowFunction actually aggregates more data, and also does
concatenations and appends, for some reason there's no backpressure.
To finish this long post, what I'm trying to understand here is why when
I collected the events using an AggregateFunction there was a
backpressure, and when Flink does this for me with ProcessWindowFunction
there's no backpressure? It seems to me something is fundamentally wrong
here, since it means I cannot do any non-reducing operations without
creating backpressure. I think it shouldn't cause the backpressure I
experienced. I'm trying to understand what I did wrong here.
Thanks!