We should maybe run some benchmarks and see what the overhead of always running a copy between chained operators actually is.
On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <se...@apache.org> wrote: > A vote is the last resort. Consensus through discussion is much nicer. And > I think we are making progress. > > We went for the lightweight version in the batch API, because > - there are few cases that are affected (only functions with side effect > state) > - you can always switch lightweight -> failsafe in the future (only > hardens guarantees), but not the other way around (dropping guarantees) > without breaking existing code. > > If you are strong on that point, I do not want to be a blocker for this. I > only ask to make a well informed decision, as this behavior is as much part > of the API as the classname of the DataStream. > > > On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > >> I would go for the Failsafe option as a default behaviour with a clearly >> documented lightweight (no-copy) setting, but I think having a Vote on this >> would be the proper way of settling this question. >> >> On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >> > I think that in the long run (maybe not too long) we will have to >> > change our stateful operators (windows, basically) to use managed >> > memory and spill to disk. (Think jobs that have sliding windows over >> > days or weeks) Then then the internal operators will take care of >> > copying anyways. The problem Gyula mentioned we cannot tackle other >> > than by defining how user code must behave. >> > >> > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org> wrote: >> > > It does not mean we have to behave the same way, it is just an >> indication >> > > that well-defined behavior can allow you to mess things up. >> > > >> > > The question is now what is the default mode: >> > > - Failsafe/Heavy (always copy) >> > > - Performance/Lightweight (do not copy) >> > > >> > > >> > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org> >> wrote: >> > > >> > >> This is something that we can clearly define as "should not be done". >> > >> Systems do that. >> > >> I think if you repeatedly emit (or mutate) the same object for example >> > in >> > >> Spark, you get an RDD with completely messed up contents. >> > >> >> > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gyf...@apache.org> >> wrote: >> > >> >> > >>> If the preceding operator is emitting a mutated object, or does >> > something >> > >>> with the output object afterwards then its a problem. >> > >>> >> > >>> Emitting the same object is a special case of this. >> > >>> >> > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org> >> > wrote: >> > >>> >> > >>> > The case you are making is if a preceding operator in a chain is >> > >>> repeatedly >> > >>> > emitting the same object, and the succeeding operator is gathering >> > the >> > >>> > objects, then it is a problem >> > >>> > >> > >>> > Or are there cases where the system itself repeatedly emits the >> same >> > >>> > objects? >> > >>> > >> > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gyf...@apache.org> >> > wrote: >> > >>> > >> > >>> > > We are designing a system for stateful stream computations, >> > assuming >> > >>> long >> > >>> > > standing operators that gather and store data as the stream >> evolves >> > >>> > (unlike >> > >>> > > in the dataset api). Many programs, like windowing, sampling etc >> > hold >> > >>> the >> > >>> > > state in the form of past data. And without careful understanding >> > of >> > >>> the >> > >>> > > runtime these programs will break or have unnecessary copies. >> > >>> > > >> > >>> > > This is why I think immutability should be the default so we can >> > have >> > >>> a >> > >>> > > clear dataflow model with immutable streams. >> > >>> > > >> > >>> > > I see absolutely no reason why we cant have the non-copy version >> > as an >> > >>> > > optional setting for the users. >> > >>> > > >> > >>> > > >> > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <par...@kth.se> >> > wrote: >> > >>> > > >> > >>> > > > @stephan I see your point. If we assume that operators do not >> > hold >> > >>> > > > references in their state to any transmitted records it works >> > fine. >> > >>> We >> > >>> > > > therefore need to make this clear to the users. I need to check >> > if >> > >>> that >> > >>> > > > would break semantics in SAMOA or other integrations as well >> that >> > >>> > assume >> > >>> > > > immutability. For example in SAMOA there are often local metric >> > >>> objects >> > >>> > > > that are being constantly mutated and simply forwarded >> > periodically >> > >>> to >> > >>> > > > other (possibly chained) operators that need to evaluate them. >> > >>> > > > >> > >>> > > > ________________________________________ >> > >>> > > > From: Gyula Fóra <gyf...@apache.org> >> > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM >> > >>> > > > To: dev@flink.apache.org >> > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator >> > calls >> > >>> > > > >> > >>> > > > "Copy before putting it into a window buffer and any other >> group >> > >>> > buffer." >> > >>> > > > >> > >>> > > > Exactly my point. Any stateful operator should be able to >> > implement >> > >>> > > > something like this without having to worry about copying the >> > object >> > >>> > (and >> > >>> > > > at this point the user would need to know whether it comes from >> > the >> > >>> > > network >> > >>> > > > to avoid unnecessary copies), so I don't agree with leaving the >> > copy >> > >>> > off. >> > >>> > > > >> > >>> > > > The user can of course specify that the operator is mutable if >> he >> > >>> wants >> > >>> > > > (and he is worried about the performance), But I still think >> the >> > >>> > default >> > >>> > > > behaviour should be immutable. >> > >>> > > > We cannot force users to not hold object references and also it >> > is a >> > >>> > > quite >> > >>> > > > unnatural way of programming in a language like java. >> > >>> > > > >> > >>> > > > >> > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen < >> se...@apache.org> >> > >>> > wrote: >> > >>> > > > >> > >>> > > > > I am curious why the copying is actually needed. >> > >>> > > > > >> > >>> > > > > In the batch API, we chain and do not copy and it is rather >> > >>> > > predictable. >> > >>> > > > > >> > >>> > > > > The cornerpoints of that design is to follow these rules: >> > >>> > > > > >> > >>> > > > > 1) Objects read from the network or any buffer are always >> new >> > >>> > objects. >> > >>> > > > > That comes naturally when they are deserialized as part of >> that >> > >>> (all >> > >>> > > > > buffers store serialized) >> > >>> > > > > >> > >>> > > > > 2) After a function returned a record (or gives one to the >> > >>> > collector), >> > >>> > > > it >> > >>> > > > > if given to the chain of chained operators, but after it is >> > >>> through >> > >>> > the >> > >>> > > > > chain, no one else holds a reference to that object. >> > >>> > > > > For that, it is crucial that objects are not stored by >> > >>> > reference, >> > >>> > > > but >> > >>> > > > > either stored serialized, or a copy is stored. >> > >>> > > > > >> > >>> > > > > This is quite solid in the batch API. How about we follow the >> > same >> > >>> > > > paradigm >> > >>> > > > > in the streaming API. We would need to adjust the following: >> > >>> > > > > >> > >>> > > > > 1) Do not copy between operators (I think this is the case >> > right >> > >>> now) >> > >>> > > > > >> > >>> > > > > 2) Copy before putting it into a window buffer and any other >> > group >> > >>> > > > buffer. >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > >> > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek < >> > >>> > aljos...@apache.org >> > >>> > > > >> > >>> > > > > wrote: >> > >>> > > > > >> > >>> > > > > > Yes, in fact I anticipated this. There is one central place >> > >>> where >> > >>> > we >> > >>> > > > > > can insert a copy step, in OperatorCollector in >> > OutputHandler. >> > >>> > > > > > >> > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone < >> > par...@kth.se> >> > >>> > > wrote: >> > >>> > > > > > > I guess it was not intended ^^. >> > >>> > > > > > > >> > >>> > > > > > > Chaining should be transparent and not break the >> > >>> correct/expected >> > >>> > > > > > behaviour. >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > > Paris? >> > >>> > > > > > > >> > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi < >> > mbala...@apache.org >> > >>> > >> > >>> > > > wrote: >> > >>> > > > > > > >> > >>> > > > > > > +1 for copying. >> > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" < >> gyf...@apache.org> >> > >>> > wrote: >> > >>> > > > > > > >> > >>> > > > > > > Hey, >> > >>> > > > > > > >> > >>> > > > > > > The latest streaming operator rework removed the copying >> of >> > >>> the >> > >>> > > > outputs >> > >>> > > > > > > before passing them to chained operators. This is a major >> > >>> break >> > >>> > for >> > >>> > > > the >> > >>> > > > > > > previous operator semantics which guaranteed >> immutability. >> > >>> > > > > > > >> > >>> > > > > > > I think this change leads to very indeterministic program >> > >>> > behaviour >> > >>> > > > > from >> > >>> > > > > > > the user's perspective as only non-chained outputs/inputs >> > >>> will be >> > >>> > > > > > mutable. >> > >>> > > > > > > If we allow this to happen, users will start disabling >> > >>> chaining >> > >>> > to >> > >>> > > > get >> > >>> > > > > > > immutability which defeats the purpose. (chaining should >> > not >> > >>> > affect >> > >>> > > > > > program >> > >>> > > > > > > behaviour just increase performance) >> > >>> > > > > > > >> > >>> > > > > > > In my opinion the default setting for each operator >> should >> > be >> > >>> > > > > > immutability >> > >>> > > > > > > and the user could override this manually if he/she >> wants. >> > >>> > > > > > > >> > >>> > > > > > > What do you think? >> > >>> > > > > > > >> > >>> > > > > > > Regards, >> > >>> > > > > > > Gyula >> > >>> > > > > > > >> > >>> > > > > > > >> > >>> > > > > > >> > >>> > > > > >> > >>> > > > >> > >>> > > >> > >>> > >> > >>> >> > >> >> > >> >> > >>