On Fri, May 3, 2019 at 9:29 AM Viliam Durina <vil...@hazelcast.com> wrote:
>
> > you MUST NOT mutate your inputs
> I think it's enough to not mutate the inputs after you emit them. From this 
> follows that when you receive an input, the upstream vertex will not try to 
> mutate it in parallel. This is what Hazelcast Jet expects. We have no option 
> to automatically clone objects after each step.

There's also the case of sibling fusion. E.g. if your graph looks like

   ---> B
 /
A
 \
   ---> C

which all gets fused together, then both B and C are applied to each
output of A, which means it is not safe for B and C to mutate their
inputs lest its sibling (whichever is applied second) see this
mutation.

> On Thu, 2 May 2019 at 20:01, Maximilian Michels <m...@apache.org> wrote:
>>
>> > I am not sure what are you referring to here. What do you mean Kryo is 
>> > simply slower ... Beam Kryo or Flink Kryo or?
>>
>> Flink uses Kryo as a fallback serializer when its own type serialization
>> system can't analyze the type. I'm just guessing here that this could be
>> slower.
>>
>> On 02.05.19 16:51, Jozef Vilcek wrote:
>> >
>> >
>> > On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <m...@apache.org
>> > <mailto:m...@apache.org>> wrote:
>> >
>> >     Thanks for the JIRA issues Jozef!
>> >
>> >      > So the feature in Flink is operator chaining and Flink per
>> >     default initiate copy of input elements. In case of Beam coders copy
>> >     seems to be more noticable than native Flink.
>> >
>> >     Copying between chained operators can be turned off in the
>> >     FlinkPipelineOptions (if you know what you're doing).
>> >
>> >
>> > Yes, I know that it can be instracted to reuse objects (if you are
>> > referring to this). I am just not sure I want to open this door in
>> > general :)
>> > But it is interesting to learn, that with portability, this will be
>> > turned On per default. Quite important finding imho.
>> >
>> >     Beam coders should
>> >     not be slower than Flink's. They are simple wrapped. It seems Kryo is
>> >     simply slower which we could fix by providing more type hints to Flink.
>> >
>> >
>> > I am not sure what are you referring to here. What do you mean Kryo is
>> > simply slower ... Beam Kryo or Flink Kryo or?
>> >
>> >     -Max
>> >
>> >     On 02.05.19 13:15, Robert Bradshaw wrote:
>> >      > Thanks for filing those.
>> >      >
>> >      > As for how not doing a copy is "safe," it's not really. Beam simply
>> >      > asserts that you MUST NOT mutate your inputs (and direct runners,
>> >      > which are used during testing, do perform extra copies and checks to
>> >      > catch violations of this requirement).
>> >      >
>> >      > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
>> >     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
>> >      >>
>> >      >> I have created
>> >      >> https://issues.apache.org/jira/browse/BEAM-7204
>> >      >> https://issues.apache.org/jira/browse/BEAM-7206
>> >      >>
>> >      >> to track these topics further
>> >      >>
>> >      >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
>> >     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
>> >      >>>
>> >      >>>
>> >      >>>
>> >      >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
>> >     <k...@apache.org <mailto:k...@apache.org>> wrote:
>> >      >>>>
>> >      >>>>
>> >      >>>>
>> >      >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com
>> >     <mailto:re...@google.com>> wrote:
>> >      >>>>>
>> >      >>>>> In that case, Robert's point is quite valid. The old Flink
>> >     runner I believe had no knowledge of fusion, which was known to make
>> >     it extremely slow. A lot of work went into making the portable
>> >     runner fusion aware, so we don't need to round trip through coders
>> >     on every ParDo.
>> >      >>>>
>> >      >>>>
>> >      >>>> The old Flink runner got fusion for free, since Flink does it.
>> >     The new fusion in portability is because fusing the runner side of
>> >     portability steps does not achieve real fusion
>> >      >>>
>> >      >>>
>> >      >>> Aha, I see. So the feature in Flink is operator chaining and
>> >     Flink per default initiate copy of input elements. In case of Beam
>> >     coders copy seems to be more noticable than native Flink.
>> >      >>> So do I get it right that in portable runner scenario, you do
>> >     similar chaining via this "fusion of stages"? Curious here... how is
>> >     it different from chaining so runner can be sure that not doing copy
>> >     is "safe" with respect to user defined functions and their behaviour
>> >     over inputs?
>> >      >>>
>> >      >>>>>
>> >      >>>>>
>> >      >>>>> Reuven
>> >      >>>>>
>> >      >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
>> >     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
>> >      >>>>>>
>> >      >>>>>> It was not a portable Flink runner.
>> >      >>>>>>
>> >      >>>>>> Thanks all for the thoughts, I will create JIRAs, as
>> >     suggested, with my findings and send them out
>> >      >>>>>>
>> >      >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
>> >     <re...@google.com <mailto:re...@google.com>> wrote:
>> >      >>>>>>>
>> >      >>>>>>> Jozef did you use the portable Flink runner or the old one?
>> >      >>>>>>>
>> >      >>>>>>> Reuven
>> >      >>>>>>>
>> >      >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
>> >     <rober...@google.com <mailto:rober...@google.com>> wrote:
>> >      >>>>>>>>
>> >      >>>>>>>> Thanks for starting this investigation. As mentioned, most
>> >     of the work
>> >      >>>>>>>> to date has been on feature parity, not performance
>> >     parity, but we're
>> >      >>>>>>>> at the point that the latter should be tackled as well.
>> >     Even if there
>> >      >>>>>>>> is a slight overhead (and there's talk about integrating
>> >     more deeply
>> >      >>>>>>>> with the Flume DAG that could elide even that) I'd expect
>> >     it should be
>> >      >>>>>>>> nowhere near the 3x that you're seeing. Aside from the
>> >     timer issue,
>> >      >>>>>>>> sounds like the cloning via coders is is a huge drag that
>> >     needs to be
>> >      >>>>>>>> addressed. I wonder if this is one of those cases where
>> >     using the
>> >      >>>>>>>> portability framework could be a performance win
>> >     (specifically, no
>> >      >>>>>>>> cloning would happen between operators of fused stages,
>> >     and the
>> >      >>>>>>>> cloning between operators could be on the raw bytes[] (if
>> >     needed at
>> >      >>>>>>>> all, because we know they wouldn't be mutated).
>> >      >>>>>>>>
>> >      >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
>> >     <k...@apache.org <mailto:k...@apache.org>> wrote:
>> >      >>>>>>>>>
>> >      >>>>>>>>> Specifically, a lot of shared code assumes that
>> >     repeatedly setting a timer is nearly free / the same cost as
>> >     determining whether or not to set the timer. ReduceFnRunner has been
>> >     refactored in a way so it would be very easy to set the GC timer
>> >     once per window that occurs in a bundle, but there's probably some
>> >     underlying inefficiency around why this isn't cheap that would be a
>> >     bigger win.
>> >      >>>>>>>>>
>> >      >>>>>>>>> Kenn
>> >      >>>>>>>>>
>> >      >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
>> >     <re...@google.com <mailto:re...@google.com>> wrote:
>> >      >>>>>>>>>>
>> >      >>>>>>>>>> I think the short answer is that folks working on the
>> >     BeamFlink runner have mostly been focused on getting everything
>> >     working, and so have not dug into this performance too deeply. I
>> >     suspect that there is low-hanging fruit to optimize as a result.
>> >      >>>>>>>>>>
>> >      >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
>> >     each element. I think this code dates back to before Beam; on
>> >     Dataflow timers are identified by tag, so this simply overwrites the
>> >     existing timer which is very cheap in Dataflow. If it is not cheap
>> >     on Flink, this might be something to optimize.
>> >      >>>>>>>>>>
>> >      >>>>>>>>>> Reuven
>> >      >>>>>>>>>>
>> >      >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
>> >     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> Hello,
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> I am interested in any knowledge or thoughts on what
>> >     should be / is an overhead of running Beam pipelines instead of
>> >     pipelines written on "bare runner". Is this something which is being
>> >     tested or investigated by community? Is there a consensus in what
>> >     bounds should the overhead typically be? I realise this is very
>> >     runner specific, but certain things are imposed also by SDK model
>> >     itself.
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> I tested simple streaming pipeline on Flink vs
>> >     Beam-Flink and found very noticeable differences. I want to stress
>> >     out, it was not a performance test. Job does following:
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
>> >     deserialisation errors -> Reshuffle -> Report counter.inc() to
>> >     metrics for throughput
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> Both jobs had same configuration and same state backed
>> >     with same checkpointing strategy. What I noticed from few simple
>> >     test runs:
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
>> >     worker I have found out that ~50% time was spend either on removing
>> >     timers from HeapInternalTimerService or in
>> >     java.io.ByteArrayOutputStream from CoderUtils.clone()
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> * problem with timer delete was addressed by
>> >     FLINK-9423. I have retested on Flink 1.7.2 and there was not much
>> >     time is spend in timer delete now, but root cause was not removed.
>> >     It still remains that timers are frequently registered and removed (
>> >     I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
>> >     which case it is called per processed element? )  which is
>> >     noticeable in GC activity, Heap and State ...
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
>> >     state in memory CopyOnWriteStateTable which after some time is full
>> >     of PaneInfo objects. Maybe they come from PaneInfoTracker activity
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
>> >     between operators too, in my case it is via Kryo.copy() but this is
>> >     not noticeable in CPU profile. Kryo.copy() does copy on object level
>> >     not boject -> bytes -> object which is cheaper
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> Overall, my observation is that pure Flink can be
>> >     roughly 3x faster.
>> >      >>>>>>>>>>>
>> >      >>>>>>>>>>> I do not know what I am trying to achieve here :)
>> >     Probably just start a discussion and collect thoughts and other
>> >     experiences on the cost of running some data processing on Beam and
>> >     particular runner.
>> >      >>>>>>>>>>>
>> >

Reply via email to