Hi Jozef,

Yes there is potential for overhead with running Beam pipelines on different Runners. The Beam model has an execution framework which each Runner utilizes in a slightly different way.

Timers in Flink, for example, are uniquely identified by a namespace and a timestamp. In Beam, they are only identified by a namespace and any pending timers with the same namespace will get overwritten in case a new timer with the same namespace is set. To implement this on top of Flink, we have to maintain a table of timers by namespace; though it seems this did not cause a slowdown in your case.

I think it would be very helpful to compile a list of issues that could slow down pipelines. How about filing JIRA issues for what you discovered during profiling? We could use a "performance" tag for discoverability. I'd be eager to investigate some of those.

Thanks,
Max

PS: We have performance regression tests: https://apache-beam-testing.appspot.com/explore?dashboard=5699257587728384

On 29.04.19 12:47, Jozef Vilcek wrote:
Hello,

I am interested in any knowledge or thoughts on what should be / is an overhead of running Beam pipelines instead of pipelines written on "bare runner". Is this something which is being tested or investigated by community? Is there a consensus in what bounds should the overhead typically be? I realise this is very runner specific, but certain things are imposed also by SDK model itself.

I tested simple streaming pipeline on Flink vs Beam-Flink and found very noticeable differences. I want to stress out, it was not a performance test. Job does following:

Read Kafka -> Deserialize to Proto -> Filter deserialisation errors -> Reshuffle -> Report counter.inc() to metrics for throughput

Both jobs had same configuration and same state backed with same checkpointing strategy. What I noticed from few simple test runs:

* first run on Flink 1.5.0 from CPU profiles on one worker I have found out that ~50% time was spend either on removing timers from HeapInternalTimerService or in java.io.ByteArrayOutputStream from CoderUtils.clone()

* problem with timer delete was addressed by FLINK-9423. I have retested on Flink 1.7.2 and there was not much time is spend in timer delete now, but root cause was not removed. It still remains that timers are frequently registered and removed ( I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called per processed element? )  which is noticeable in GC activity, Heap and State ...

* in Flink I use FileSystem state backed which keeps state in memory CopyOnWriteStateTable which after some time is full of PaneInfo objects. Maybe they come from PaneInfoTracker activity

* Coder clone is painfull. Pure Flink job does copy between operators too, in my case it is via Kryo.copy() but this is not noticeable in CPU profile. Kryo.copy() does copy on object level not boject -> bytes -> object which is cheaper

Overall, my observation is that pure Flink can be roughly 3x faster.

I do not know what I am trying to achieve here :) Probably just start a discussion and collect thoughts and other experiences on the cost of running some data processing on Beam and particular runner.

Reply via email to