Specifically, a lot of shared code assumes that repeatedly setting a timer is nearly free / the same cost as determining whether or not to set the timer. ReduceFnRunner has been refactored in a way so it would be very easy to set the GC timer once per window that occurs in a bundle, but there's probably some underlying inefficiency around why this isn't cheap that would be a bigger win.
Kenn On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> wrote: > I think the short answer is that folks working on the BeamFlink runner > have mostly been focused on getting everything working, and so have not dug > into this performance too deeply. I suspect that there is low-hanging fruit > to optimize as a result. > > You're right that ReduceFnRunner schedules a timer for each element. I > think this code dates back to before Beam; on Dataflow timers are > identified by tag, so this simply overwrites the existing timer which is > very cheap in Dataflow. If it is not cheap on Flink, this might be > something to optimize. > > Reuven > > On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> Hello, >> >> I am interested in any knowledge or thoughts on what should be / is an >> overhead of running Beam pipelines instead of pipelines written on "bare >> runner". Is this something which is being tested or investigated by >> community? Is there a consensus in what bounds should the overhead >> typically be? I realise this is very runner specific, but certain things >> are imposed also by SDK model itself. >> >> I tested simple streaming pipeline on Flink vs Beam-Flink and found very >> noticeable differences. I want to stress out, it was not a performance >> test. Job does following: >> >> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors -> >> Reshuffle -> Report counter.inc() to metrics for throughput >> >> Both jobs had same configuration and same state backed with same >> checkpointing strategy. What I noticed from few simple test runs: >> >> * first run on Flink 1.5.0 from CPU profiles on one worker I have found >> out that ~50% time was spend either on removing timers >> from HeapInternalTimerService or in java.io.ByteArrayOutputStream from >> CoderUtils.clone() >> >> * problem with timer delete was addressed by FLINK-9423. I have retested >> on Flink 1.7.2 and there was not much time is spend in timer delete now, >> but root cause was not removed. It still remains that timers are frequently >> registered and removed ( I believe >> from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is >> called per processed element? ) which is noticeable in GC activity, Heap >> and State ... >> >> * in Flink I use FileSystem state backed which keeps state in memory >> CopyOnWriteStateTable which after some time is full of PaneInfo objects. >> Maybe they come from PaneInfoTracker activity >> >> * Coder clone is painfull. Pure Flink job does copy between operators >> too, in my case it is via Kryo.copy() but this is not noticeable in CPU >> profile. Kryo.copy() does copy on object level not boject -> bytes -> >> object which is cheaper >> >> Overall, my observation is that pure Flink can be roughly 3x faster. >> >> I do not know what I am trying to achieve here :) Probably just start a >> discussion and collect thoughts and other experiences on the cost of >> running some data processing on Beam and particular runner. >> >> >