Specifically, a lot of shared code assumes that repeatedly setting a timer
is nearly free / the same cost as determining whether or not to set the
timer. ReduceFnRunner has been refactored in a way so it would be very easy
to set the GC timer once per window that occurs in a bundle, but there's
probably some underlying inefficiency around why this isn't cheap that
would be a bigger win.

Kenn

On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> wrote:

> I think the short answer is that folks working on the BeamFlink runner
> have mostly been focused on getting everything working, and so have not dug
> into this performance too deeply. I suspect that there is low-hanging fruit
> to optimize as a result.
>
> You're right that ReduceFnRunner schedules a timer for each element. I
> think this code dates back to before Beam; on Dataflow timers are
> identified by tag, so this simply overwrites the existing timer which is
> very cheap in Dataflow. If it is not cheap on Flink, this might be
> something to optimize.
>
> Reuven
>
> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I am interested in any knowledge or thoughts on what should be / is an
>> overhead of running Beam pipelines instead of pipelines written on "bare
>> runner". Is this something which is being tested or investigated by
>> community? Is there a consensus in what bounds should the overhead
>> typically be? I realise this is very runner specific, but certain things
>> are imposed also by SDK model itself.
>>
>> I tested simple streaming pipeline on Flink vs Beam-Flink and found very
>> noticeable differences. I want to stress out, it was not a performance
>> test. Job does following:
>>
>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors ->
>> Reshuffle -> Report counter.inc() to metrics for throughput
>>
>> Both jobs had same configuration and same state backed with same
>> checkpointing strategy. What I noticed from few simple test runs:
>>
>> * first run on Flink 1.5.0 from CPU profiles on one worker I have found
>> out that ~50% time was spend either on removing timers
>> from HeapInternalTimerService or in java.io.ByteArrayOutputStream from
>> CoderUtils.clone()
>>
>> * problem with timer delete was addressed by FLINK-9423. I have retested
>> on Flink 1.7.2 and there was not much time is spend in timer delete now,
>> but root cause was not removed. It still remains that timers are frequently
>> registered and removed ( I believe
>> from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is
>> called per processed element? )  which is noticeable in GC activity, Heap
>> and State ...
>>
>> * in Flink I use FileSystem state backed which keeps state in memory
>> CopyOnWriteStateTable which after some time is full of PaneInfo objects.
>> Maybe they come from PaneInfoTracker activity
>>
>> * Coder clone is painfull. Pure Flink job does copy between operators
>> too, in my case it is via Kryo.copy() but this is not noticeable in CPU
>> profile. Kryo.copy() does copy on object level not boject -> bytes ->
>> object which is cheaper
>>
>> Overall, my observation is that pure Flink can be roughly 3x faster.
>>
>> I do not know what I am trying to achieve here :) Probably just start a
>> discussion and collect thoughts and other experiences on the cost of
>> running some data processing on Beam and particular runner.
>>
>>
>

Reply via email to