Thanks for starting this investigation. As mentioned, most of the work to date has been on feature parity, not performance parity, but we're at the point that the latter should be tackled as well. Even if there is a slight overhead (and there's talk about integrating more deeply with the Flume DAG that could elide even that) I'd expect it should be nowhere near the 3x that you're seeing. Aside from the timer issue, sounds like the cloning via coders is is a huge drag that needs to be addressed. I wonder if this is one of those cases where using the portability framework could be a performance win (specifically, no cloning would happen between operators of fused stages, and the cloning between operators could be on the raw bytes[] (if needed at all, because we know they wouldn't be mutated).
On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <k...@apache.org> wrote: > > Specifically, a lot of shared code assumes that repeatedly setting a timer is > nearly free / the same cost as determining whether or not to set the timer. > ReduceFnRunner has been refactored in a way so it would be very easy to set > the GC timer once per window that occurs in a bundle, but there's probably > some underlying inefficiency around why this isn't cheap that would be a > bigger win. > > Kenn > > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> wrote: >> >> I think the short answer is that folks working on the BeamFlink runner have >> mostly been focused on getting everything working, and so have not dug into >> this performance too deeply. I suspect that there is low-hanging fruit to >> optimize as a result. >> >> You're right that ReduceFnRunner schedules a timer for each element. I think >> this code dates back to before Beam; on Dataflow timers are identified by >> tag, so this simply overwrites the existing timer which is very cheap in >> Dataflow. If it is not cheap on Flink, this might be something to optimize. >> >> Reuven >> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: >>> >>> Hello, >>> >>> I am interested in any knowledge or thoughts on what should be / is an >>> overhead of running Beam pipelines instead of pipelines written on "bare >>> runner". Is this something which is being tested or investigated by >>> community? Is there a consensus in what bounds should the overhead >>> typically be? I realise this is very runner specific, but certain things >>> are imposed also by SDK model itself. >>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found very >>> noticeable differences. I want to stress out, it was not a performance >>> test. Job does following: >>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors -> >>> Reshuffle -> Report counter.inc() to metrics for throughput >>> >>> Both jobs had same configuration and same state backed with same >>> checkpointing strategy. What I noticed from few simple test runs: >>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have found out >>> that ~50% time was spend either on removing timers from >>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from >>> CoderUtils.clone() >>> >>> * problem with timer delete was addressed by FLINK-9423. I have retested on >>> Flink 1.7.2 and there was not much time is spend in timer delete now, but >>> root cause was not removed. It still remains that timers are frequently >>> registered and removed ( I believe from >>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called >>> per processed element? ) which is noticeable in GC activity, Heap and >>> State ... >>> >>> * in Flink I use FileSystem state backed which keeps state in memory >>> CopyOnWriteStateTable which after some time is full of PaneInfo objects. >>> Maybe they come from PaneInfoTracker activity >>> >>> * Coder clone is painfull. Pure Flink job does copy between operators too, >>> in my case it is via Kryo.copy() but this is not noticeable in CPU profile. >>> Kryo.copy() does copy on object level not boject -> bytes -> object which >>> is cheaper >>> >>> Overall, my observation is that pure Flink can be roughly 3x faster. >>> >>> I do not know what I am trying to achieve here :) Probably just start a >>> discussion and collect thoughts and other experiences on the cost of >>> running some data processing on Beam and particular runner. >>>