On Fri, May 3, 2019 at 9:29 AM Viliam Durina <vil...@hazelcast.com> wrote: > > > you MUST NOT mutate your inputs > I think it's enough to not mutate the inputs after you emit them. From this > follows that when you receive an input, the upstream vertex will not try to > mutate it in parallel. This is what Hazelcast Jet expects. We have no option > to automatically clone objects after each step.
There's also the case of sibling fusion. E.g. if your graph looks like ---> B / A \ ---> C which all gets fused together, then both B and C are applied to each output of A, which means it is not safe for B and C to mutate their inputs lest its sibling (whichever is applied second) see this mutation. > On Thu, 2 May 2019 at 20:01, Maximilian Michels <m...@apache.org> wrote: >> >> > I am not sure what are you referring to here. What do you mean Kryo is >> > simply slower ... Beam Kryo or Flink Kryo or? >> >> Flink uses Kryo as a fallback serializer when its own type serialization >> system can't analyze the type. I'm just guessing here that this could be >> slower. >> >> On 02.05.19 16:51, Jozef Vilcek wrote: >> > >> > >> > On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <m...@apache.org >> > <mailto:m...@apache.org>> wrote: >> > >> > Thanks for the JIRA issues Jozef! >> > >> > > So the feature in Flink is operator chaining and Flink per >> > default initiate copy of input elements. In case of Beam coders copy >> > seems to be more noticable than native Flink. >> > >> > Copying between chained operators can be turned off in the >> > FlinkPipelineOptions (if you know what you're doing). >> > >> > >> > Yes, I know that it can be instracted to reuse objects (if you are >> > referring to this). I am just not sure I want to open this door in >> > general :) >> > But it is interesting to learn, that with portability, this will be >> > turned On per default. Quite important finding imho. >> > >> > Beam coders should >> > not be slower than Flink's. They are simple wrapped. It seems Kryo is >> > simply slower which we could fix by providing more type hints to Flink. >> > >> > >> > I am not sure what are you referring to here. What do you mean Kryo is >> > simply slower ... Beam Kryo or Flink Kryo or? >> > >> > -Max >> > >> > On 02.05.19 13:15, Robert Bradshaw wrote: >> > > Thanks for filing those. >> > > >> > > As for how not doing a copy is "safe," it's not really. Beam simply >> > > asserts that you MUST NOT mutate your inputs (and direct runners, >> > > which are used during testing, do perform extra copies and checks to >> > > catch violations of this requirement). >> > > >> > > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek >> > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: >> > >> >> > >> I have created >> > >> https://issues.apache.org/jira/browse/BEAM-7204 >> > >> https://issues.apache.org/jira/browse/BEAM-7206 >> > >> >> > >> to track these topics further >> > >> >> > >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek >> > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: >> > >>> >> > >>> >> > >>> >> > >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles >> > <k...@apache.org <mailto:k...@apache.org>> wrote: >> > >>>> >> > >>>> >> > >>>> >> > >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com >> > <mailto:re...@google.com>> wrote: >> > >>>>> >> > >>>>> In that case, Robert's point is quite valid. The old Flink >> > runner I believe had no knowledge of fusion, which was known to make >> > it extremely slow. A lot of work went into making the portable >> > runner fusion aware, so we don't need to round trip through coders >> > on every ParDo. >> > >>>> >> > >>>> >> > >>>> The old Flink runner got fusion for free, since Flink does it. >> > The new fusion in portability is because fusing the runner side of >> > portability steps does not achieve real fusion >> > >>> >> > >>> >> > >>> Aha, I see. So the feature in Flink is operator chaining and >> > Flink per default initiate copy of input elements. In case of Beam >> > coders copy seems to be more noticable than native Flink. >> > >>> So do I get it right that in portable runner scenario, you do >> > similar chaining via this "fusion of stages"? Curious here... how is >> > it different from chaining so runner can be sure that not doing copy >> > is "safe" with respect to user defined functions and their behaviour >> > over inputs? >> > >>> >> > >>>>> >> > >>>>> >> > >>>>> Reuven >> > >>>>> >> > >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek >> > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: >> > >>>>>> >> > >>>>>> It was not a portable Flink runner. >> > >>>>>> >> > >>>>>> Thanks all for the thoughts, I will create JIRAs, as >> > suggested, with my findings and send them out >> > >>>>>> >> > >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax >> > <re...@google.com <mailto:re...@google.com>> wrote: >> > >>>>>>> >> > >>>>>>> Jozef did you use the portable Flink runner or the old one? >> > >>>>>>> >> > >>>>>>> Reuven >> > >>>>>>> >> > >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw >> > <rober...@google.com <mailto:rober...@google.com>> wrote: >> > >>>>>>>> >> > >>>>>>>> Thanks for starting this investigation. As mentioned, most >> > of the work >> > >>>>>>>> to date has been on feature parity, not performance >> > parity, but we're >> > >>>>>>>> at the point that the latter should be tackled as well. >> > Even if there >> > >>>>>>>> is a slight overhead (and there's talk about integrating >> > more deeply >> > >>>>>>>> with the Flume DAG that could elide even that) I'd expect >> > it should be >> > >>>>>>>> nowhere near the 3x that you're seeing. Aside from the >> > timer issue, >> > >>>>>>>> sounds like the cloning via coders is is a huge drag that >> > needs to be >> > >>>>>>>> addressed. I wonder if this is one of those cases where >> > using the >> > >>>>>>>> portability framework could be a performance win >> > (specifically, no >> > >>>>>>>> cloning would happen between operators of fused stages, >> > and the >> > >>>>>>>> cloning between operators could be on the raw bytes[] (if >> > needed at >> > >>>>>>>> all, because we know they wouldn't be mutated). >> > >>>>>>>> >> > >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles >> > <k...@apache.org <mailto:k...@apache.org>> wrote: >> > >>>>>>>>> >> > >>>>>>>>> Specifically, a lot of shared code assumes that >> > repeatedly setting a timer is nearly free / the same cost as >> > determining whether or not to set the timer. ReduceFnRunner has been >> > refactored in a way so it would be very easy to set the GC timer >> > once per window that occurs in a bundle, but there's probably some >> > underlying inefficiency around why this isn't cheap that would be a >> > bigger win. >> > >>>>>>>>> >> > >>>>>>>>> Kenn >> > >>>>>>>>> >> > >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax >> > <re...@google.com <mailto:re...@google.com>> wrote: >> > >>>>>>>>>> >> > >>>>>>>>>> I think the short answer is that folks working on the >> > BeamFlink runner have mostly been focused on getting everything >> > working, and so have not dug into this performance too deeply. I >> > suspect that there is low-hanging fruit to optimize as a result. >> > >>>>>>>>>> >> > >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for >> > each element. I think this code dates back to before Beam; on >> > Dataflow timers are identified by tag, so this simply overwrites the >> > existing timer which is very cheap in Dataflow. If it is not cheap >> > on Flink, this might be something to optimize. >> > >>>>>>>>>> >> > >>>>>>>>>> Reuven >> > >>>>>>>>>> >> > >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek >> > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: >> > >>>>>>>>>>> >> > >>>>>>>>>>> Hello, >> > >>>>>>>>>>> >> > >>>>>>>>>>> I am interested in any knowledge or thoughts on what >> > should be / is an overhead of running Beam pipelines instead of >> > pipelines written on "bare runner". Is this something which is being >> > tested or investigated by community? Is there a consensus in what >> > bounds should the overhead typically be? I realise this is very >> > runner specific, but certain things are imposed also by SDK model >> > itself. >> > >>>>>>>>>>> >> > >>>>>>>>>>> I tested simple streaming pipeline on Flink vs >> > Beam-Flink and found very noticeable differences. I want to stress >> > out, it was not a performance test. Job does following: >> > >>>>>>>>>>> >> > >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter >> > deserialisation errors -> Reshuffle -> Report counter.inc() to >> > metrics for throughput >> > >>>>>>>>>>> >> > >>>>>>>>>>> Both jobs had same configuration and same state backed >> > with same checkpointing strategy. What I noticed from few simple >> > test runs: >> > >>>>>>>>>>> >> > >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one >> > worker I have found out that ~50% time was spend either on removing >> > timers from HeapInternalTimerService or in >> > java.io.ByteArrayOutputStream from CoderUtils.clone() >> > >>>>>>>>>>> >> > >>>>>>>>>>> * problem with timer delete was addressed by >> > FLINK-9423. I have retested on Flink 1.7.2 and there was not much >> > time is spend in timer delete now, but root cause was not removed. >> > It still remains that timers are frequently registered and removed ( >> > I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in >> > which case it is called per processed element? ) which is >> > noticeable in GC activity, Heap and State ... >> > >>>>>>>>>>> >> > >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps >> > state in memory CopyOnWriteStateTable which after some time is full >> > of PaneInfo objects. Maybe they come from PaneInfoTracker activity >> > >>>>>>>>>>> >> > >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy >> > between operators too, in my case it is via Kryo.copy() but this is >> > not noticeable in CPU profile. Kryo.copy() does copy on object level >> > not boject -> bytes -> object which is cheaper >> > >>>>>>>>>>> >> > >>>>>>>>>>> Overall, my observation is that pure Flink can be >> > roughly 3x faster. >> > >>>>>>>>>>> >> > >>>>>>>>>>> I do not know what I am trying to achieve here :) >> > Probably just start a discussion and collect thoughts and other >> > experiences on the cost of running some data processing on Beam and >> > particular runner. >> > >>>>>>>>>>> >> >