In that case, Robert's point is quite valid. The old Flink runner I believe had no knowledge of fusion, which was known to make it extremely slow. A lot of work went into making the portable runner fusion aware, so we don't need to round trip through coders on every ParDo.
Reuven On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > It was not a portable Flink runner. > > Thanks all for the thoughts, I will create JIRAs, as suggested, with my > findings and send them out > > On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com> wrote: > >> Jozef did you use the portable Flink runner or the old one? >> >> Reuven >> >> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> Thanks for starting this investigation. As mentioned, most of the work >>> to date has been on feature parity, not performance parity, but we're >>> at the point that the latter should be tackled as well. Even if there >>> is a slight overhead (and there's talk about integrating more deeply >>> with the Flume DAG that could elide even that) I'd expect it should be >>> nowhere near the 3x that you're seeing. Aside from the timer issue, >>> sounds like the cloning via coders is is a huge drag that needs to be >>> addressed. I wonder if this is one of those cases where using the >>> portability framework could be a performance win (specifically, no >>> cloning would happen between operators of fused stages, and the >>> cloning between operators could be on the raw bytes[] (if needed at >>> all, because we know they wouldn't be mutated). >>> >>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <k...@apache.org> >>> wrote: >>> > >>> > Specifically, a lot of shared code assumes that repeatedly setting a >>> timer is nearly free / the same cost as determining whether or not to set >>> the timer. ReduceFnRunner has been refactored in a way so it would be very >>> easy to set the GC timer once per window that occurs in a bundle, but >>> there's probably some underlying inefficiency around why this isn't cheap >>> that would be a bigger win. >>> > >>> > Kenn >>> > >>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> wrote: >>> >> >>> >> I think the short answer is that folks working on the BeamFlink >>> runner have mostly been focused on getting everything working, and so have >>> not dug into this performance too deeply. I suspect that there is >>> low-hanging fruit to optimize as a result. >>> >> >>> >> You're right that ReduceFnRunner schedules a timer for each element. >>> I think this code dates back to before Beam; on Dataflow timers are >>> identified by tag, so this simply overwrites the existing timer which is >>> very cheap in Dataflow. If it is not cheap on Flink, this might be >>> something to optimize. >>> >> >>> >> Reuven >>> >> >>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>> >>> >>> Hello, >>> >>> >>> >>> I am interested in any knowledge or thoughts on what should be / is >>> an overhead of running Beam pipelines instead of pipelines written on "bare >>> runner". Is this something which is being tested or investigated by >>> community? Is there a consensus in what bounds should the overhead >>> typically be? I realise this is very runner specific, but certain things >>> are imposed also by SDK model itself. >>> >>> >>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found >>> very noticeable differences. I want to stress out, it was not a performance >>> test. Job does following: >>> >>> >>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors >>> -> Reshuffle -> Report counter.inc() to metrics for throughput >>> >>> >>> >>> Both jobs had same configuration and same state backed with same >>> checkpointing strategy. What I noticed from few simple test runs: >>> >>> >>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have >>> found out that ~50% time was spend either on removing timers from >>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from >>> CoderUtils.clone() >>> >>> >>> >>> * problem with timer delete was addressed by FLINK-9423. I have >>> retested on Flink 1.7.2 and there was not much time is spend in timer >>> delete now, but root cause was not removed. It still remains that timers >>> are frequently registered and removed ( I believe from >>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called >>> per processed element? ) which is noticeable in GC activity, Heap and >>> State ... >>> >>> >>> >>> * in Flink I use FileSystem state backed which keeps state in memory >>> CopyOnWriteStateTable which after some time is full of PaneInfo objects. >>> Maybe they come from PaneInfoTracker activity >>> >>> >>> >>> * Coder clone is painfull. Pure Flink job does copy between >>> operators too, in my case it is via Kryo.copy() but this is not noticeable >>> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes >>> -> object which is cheaper >>> >>> >>> >>> Overall, my observation is that pure Flink can be roughly 3x faster. >>> >>> >>> >>> I do not know what I am trying to achieve here :) Probably just >>> start a discussion and collect thoughts and other experiences on the cost >>> of running some data processing on Beam and particular runner. >>> >>> >>> >>