Hi Jozef,
Yes there is potential for overhead with running Beam pipelines on
different Runners. The Beam model has an execution framework which each
Runner utilizes in a slightly different way.
Timers in Flink, for example, are uniquely identified by a namespace and
a timestamp. In Beam, they are only identified by a namespace and any
pending timers with the same namespace will get overwritten in case a
new timer with the same namespace is set. To implement this on top of
Flink, we have to maintain a table of timers by namespace; though it
seems this did not cause a slowdown in your case.
I think it would be very helpful to compile a list of issues that could
slow down pipelines. How about filing JIRA issues for what you
discovered during profiling? We could use a "performance" tag for
discoverability. I'd be eager to investigate some of those.
Thanks,
Max
PS: We have performance regression tests:
https://apache-beam-testing.appspot.com/explore?dashboard=5699257587728384
On 29.04.19 12:47, Jozef Vilcek wrote:
Hello,
I am interested in any knowledge or thoughts on what should be / is an
overhead of running Beam pipelines instead of pipelines written on "bare
runner". Is this something which is being tested or investigated by
community? Is there a consensus in what bounds should the overhead
typically be? I realise this is very runner specific, but certain things
are imposed also by SDK model itself.
I tested simple streaming pipeline on Flink vs Beam-Flink and found very
noticeable differences. I want to stress out, it was not a performance
test. Job does following:
Read Kafka -> Deserialize to Proto -> Filter deserialisation errors ->
Reshuffle -> Report counter.inc() to metrics for throughput
Both jobs had same configuration and same state backed with same
checkpointing strategy. What I noticed from few simple test runs:
* first run on Flink 1.5.0 from CPU profiles on one worker I have found
out that ~50% time was spend either on removing timers
from HeapInternalTimerService or in java.io.ByteArrayOutputStream from
CoderUtils.clone()
* problem with timer delete was addressed by FLINK-9423. I have retested
on Flink 1.7.2 and there was not much time is spend in timer delete now,
but root cause was not removed. It still remains that timers are
frequently registered and removed ( I believe
from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is
called per processed element? ) which is noticeable in GC activity,
Heap and State ...
* in Flink I use FileSystem state backed which keeps state in memory
CopyOnWriteStateTable which after some time is full of PaneInfo objects.
Maybe they come from PaneInfoTracker activity
* Coder clone is painfull. Pure Flink job does copy between operators
too, in my case it is via Kryo.copy() but this is not noticeable in CPU
profile. Kryo.copy() does copy on object level not boject -> bytes ->
object which is cheaper
Overall, my observation is that pure Flink can be roughly 3x faster.
I do not know what I am trying to achieve here :) Probably just start a
discussion and collect thoughts and other experiences on the cost of
running some data processing on Beam and particular runner.