That's fine, you convinced me ;-) And given a flag to deactivate it, I think it should be okay for everyone.
Once we have proper serialized window buffers, the number of copies should go down quite a bit anyways... On Wed, May 20, 2015 at 4:29 PM, Gyula Fóra <gyf...@apache.org> wrote: > This is not about me, please don't get me wrong :) > It would be good if other people would tell their opinions as well. > > I am just trying to make the point that other systems do this as well for a > reason. Users are used to this abstraction. > > On Wed, May 20, 2015 at 4:18 PM, Stephan Ewen <se...@apache.org> wrote: > > > I think it is fair to say that everything that Flink has in its core > > provides immutability. The mutability effect comes only if the user > starts > > mutating objects across functions. > > > > The overhead will depend vastly on whether you are sending smaller > records > > or large records. > > > > I see you are very keen on the failsafe variant. That is fine, I'd say > > let's go ahead. > > > > Then let us introduce a switch. The switch needs to work on copies for > user > > functions only. Until the window buffers are serialized, we need to keep > > the copies there. > > > > > > > > On Wed, May 20, 2015 at 3:55 PM, Gyula Fóra <gyula.f...@gmail.com> > wrote: > > > > > I know it is nicer to have no-copy from a performance perspective, but > a > > > dataflow system with no immutability guarantee is something very hard > to > > > describe. > > > > > > Systems like Storm and Google Dataflow have immutablility guarantees I > > > think for the same reason to provide very clear, easy to use semantics. > > > > > > On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <se...@apache.org> > wrote: > > > > > > > A vote is the last resort. Consensus through discussion is much > nicer. > > > And > > > > I think we are making progress. > > > > > > > > We went for the lightweight version in the batch API, because > > > > - there are few cases that are affected (only functions with side > > effect > > > > state) > > > > - you can always switch lightweight -> failsafe in the future (only > > > > hardens guarantees), but not the other way around (dropping > guarantees) > > > > without breaking existing code. > > > > > > > > If you are strong on that point, I do not want to be a blocker for > > this. > > > I > > > > only ask to make a well informed decision, as this behavior is as > much > > > part > > > > of the API as the classname of the DataStream. > > > > > > > > > > > > On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gyula.f...@gmail.com> > > > wrote: > > > > > > > > > I would go for the Failsafe option as a default behaviour with a > > > clearly > > > > > documented lightweight (no-copy) setting, but I think having a Vote > > on > > > > this > > > > > would be the proper way of settling this question. > > > > > > > > > > On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek < > > aljos...@apache.org > > > > > > > > > wrote: > > > > > > > > > > > I think that in the long run (maybe not too long) we will have to > > > > > > change our stateful operators (windows, basically) to use managed > > > > > > memory and spill to disk. (Think jobs that have sliding windows > > over > > > > > > days or weeks) Then then the internal operators will take care of > > > > > > copying anyways. The problem Gyula mentioned we cannot tackle > other > > > > > > than by defining how user code must behave. > > > > > > > > > > > > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org> > > > > wrote: > > > > > > > It does not mean we have to behave the same way, it is just an > > > > > indication > > > > > > > that well-defined behavior can allow you to mess things up. > > > > > > > > > > > > > > The question is now what is the default mode: > > > > > > > - Failsafe/Heavy (always copy) > > > > > > > - Performance/Lightweight (do not copy) > > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen < > se...@apache.org> > > > > > wrote: > > > > > > > > > > > > > >> This is something that we can clearly define as "should not be > > > > done". > > > > > > >> Systems do that. > > > > > > >> I think if you repeatedly emit (or mutate) the same object for > > > > example > > > > > > in > > > > > > >> Spark, you get an RDD with completely messed up contents. > > > > > > >> > > > > > > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra < > gyf...@apache.org> > > > > > wrote: > > > > > > >> > > > > > > >>> If the preceding operator is emitting a mutated object, or > does > > > > > > something > > > > > > >>> with the output object afterwards then its a problem. > > > > > > >>> > > > > > > >>> Emitting the same object is a special case of this. > > > > > > >>> > > > > > > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen < > > se...@apache.org> > > > > > > wrote: > > > > > > >>> > > > > > > >>> > The case you are making is if a preceding operator in a > chain > > > is > > > > > > >>> repeatedly > > > > > > >>> > emitting the same object, and the succeeding operator is > > > > gathering > > > > > > the > > > > > > >>> > objects, then it is a problem > > > > > > >>> > > > > > > > >>> > Or are there cases where the system itself repeatedly emits > > the > > > > > same > > > > > > >>> > objects? > > > > > > >>> > > > > > > > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra < > > gyf...@apache.org > > > > > > > > > > wrote: > > > > > > >>> > > > > > > > >>> > > We are designing a system for stateful stream > computations, > > > > > > assuming > > > > > > >>> long > > > > > > >>> > > standing operators that gather and store data as the > stream > > > > > evolves > > > > > > >>> > (unlike > > > > > > >>> > > in the dataset api). Many programs, like windowing, > > sampling > > > > etc > > > > > > hold > > > > > > >>> the > > > > > > >>> > > state in the form of past data. And without careful > > > > understanding > > > > > > of > > > > > > >>> the > > > > > > >>> > > runtime these programs will break or have unnecessary > > copies. > > > > > > >>> > > > > > > > > >>> > > This is why I think immutability should be the default so > > we > > > > can > > > > > > have > > > > > > >>> a > > > > > > >>> > > clear dataflow model with immutable streams. > > > > > > >>> > > > > > > > > >>> > > I see absolutely no reason why we cant have the non-copy > > > > version > > > > > > as an > > > > > > >>> > > optional setting for the users. > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone < > > > par...@kth.se> > > > > > > wrote: > > > > > > >>> > > > > > > > > >>> > > > @stephan I see your point. If we assume that operators > do > > > not > > > > > > hold > > > > > > >>> > > > references in their state to any transmitted records it > > > works > > > > > > fine. > > > > > > >>> We > > > > > > >>> > > > therefore need to make this clear to the users. I need > to > > > > check > > > > > > if > > > > > > >>> that > > > > > > >>> > > > would break semantics in SAMOA or other integrations as > > > well > > > > > that > > > > > > >>> > assume > > > > > > >>> > > > immutability. For example in SAMOA there are often > local > > > > metric > > > > > > >>> objects > > > > > > >>> > > > that are being constantly mutated and simply forwarded > > > > > > periodically > > > > > > >>> to > > > > > > >>> > > > other (possibly chained) operators that need to > evaluate > > > > them. > > > > > > >>> > > > > > > > > > >>> > > > ________________________________________ > > > > > > >>> > > > From: Gyula Fóra <gyf...@apache.org> > > > > > > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM > > > > > > >>> > > > To: dev@flink.apache.org > > > > > > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained > > > operator > > > > > > calls > > > > > > >>> > > > > > > > > > >>> > > > "Copy before putting it into a window buffer and any > > other > > > > > group > > > > > > >>> > buffer." > > > > > > >>> > > > > > > > > > >>> > > > Exactly my point. Any stateful operator should be able > to > > > > > > implement > > > > > > >>> > > > something like this without having to worry about > copying > > > the > > > > > > object > > > > > > >>> > (and > > > > > > >>> > > > at this point the user would need to know whether it > > comes > > > > from > > > > > > the > > > > > > >>> > > network > > > > > > >>> > > > to avoid unnecessary copies), so I don't agree with > > leaving > > > > the > > > > > > copy > > > > > > >>> > off. > > > > > > >>> > > > > > > > > > >>> > > > The user can of course specify that the operator is > > mutable > > > > if > > > > > he > > > > > > >>> wants > > > > > > >>> > > > (and he is worried about the performance), But I still > > > think > > > > > the > > > > > > >>> > default > > > > > > >>> > > > behaviour should be immutable. > > > > > > >>> > > > We cannot force users to not hold object references and > > > also > > > > it > > > > > > is a > > > > > > >>> > > quite > > > > > > >>> > > > unnatural way of programming in a language like java. > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen < > > > > > se...@apache.org> > > > > > > >>> > wrote: > > > > > > >>> > > > > > > > > > >>> > > > > I am curious why the copying is actually needed. > > > > > > >>> > > > > > > > > > > >>> > > > > In the batch API, we chain and do not copy and it is > > > rather > > > > > > >>> > > predictable. > > > > > > >>> > > > > > > > > > > >>> > > > > The cornerpoints of that design is to follow these > > rules: > > > > > > >>> > > > > > > > > > > >>> > > > > 1) Objects read from the network or any buffer are > > > always > > > > > new > > > > > > >>> > objects. > > > > > > >>> > > > > That comes naturally when they are deserialized as > part > > > of > > > > > that > > > > > > >>> (all > > > > > > >>> > > > > buffers store serialized) > > > > > > >>> > > > > > > > > > > >>> > > > > 2) After a function returned a record (or gives one > to > > > the > > > > > > >>> > collector), > > > > > > >>> > > > it > > > > > > >>> > > > > if given to the chain of chained operators, but after > > it > > > is > > > > > > >>> through > > > > > > >>> > the > > > > > > >>> > > > > chain, no one else holds a reference to that object. > > > > > > >>> > > > > For that, it is crucial that objects are not > > stored > > > by > > > > > > >>> > reference, > > > > > > >>> > > > but > > > > > > >>> > > > > either stored serialized, or a copy is stored. > > > > > > >>> > > > > > > > > > > >>> > > > > This is quite solid in the batch API. How about we > > follow > > > > the > > > > > > same > > > > > > >>> > > > paradigm > > > > > > >>> > > > > in the streaming API. We would need to adjust the > > > > following: > > > > > > >>> > > > > > > > > > > >>> > > > > 1) Do not copy between operators (I think this is the > > > case > > > > > > right > > > > > > >>> now) > > > > > > >>> > > > > > > > > > > >>> > > > > 2) Copy before putting it into a window buffer and > any > > > > other > > > > > > group > > > > > > >>> > > > buffer. > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek < > > > > > > >>> > aljos...@apache.org > > > > > > >>> > > > > > > > > > >>> > > > > wrote: > > > > > > >>> > > > > > > > > > > >>> > > > > > Yes, in fact I anticipated this. There is one > central > > > > place > > > > > > >>> where > > > > > > >>> > we > > > > > > >>> > > > > > can insert a copy step, in OperatorCollector in > > > > > > OutputHandler. > > > > > > >>> > > > > > > > > > > > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone < > > > > > > par...@kth.se> > > > > > > >>> > > wrote: > > > > > > >>> > > > > > > I guess it was not intended ^^. > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > Chaining should be transparent and not break the > > > > > > >>> correct/expected > > > > > > >>> > > > > > behaviour. > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > Paris? > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi < > > > > > > mbala...@apache.org > > > > > > >>> > > > > > > > >>> > > > wrote: > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > +1 for copying. > > > > > > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" < > > > > > gyf...@apache.org> > > > > > > >>> > wrote: > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > Hey, > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > The latest streaming operator rework removed the > > > > copying > > > > > of > > > > > > >>> the > > > > > > >>> > > > outputs > > > > > > >>> > > > > > > before passing them to chained operators. This > is a > > > > major > > > > > > >>> break > > > > > > >>> > for > > > > > > >>> > > > the > > > > > > >>> > > > > > > previous operator semantics which guaranteed > > > > > immutability. > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > I think this change leads to very indeterministic > > > > program > > > > > > >>> > behaviour > > > > > > >>> > > > > from > > > > > > >>> > > > > > > the user's perspective as only non-chained > > > > outputs/inputs > > > > > > >>> will be > > > > > > >>> > > > > > mutable. > > > > > > >>> > > > > > > If we allow this to happen, users will start > > > disabling > > > > > > >>> chaining > > > > > > >>> > to > > > > > > >>> > > > get > > > > > > >>> > > > > > > immutability which defeats the purpose. (chaining > > > > should > > > > > > not > > > > > > >>> > affect > > > > > > >>> > > > > > program > > > > > > >>> > > > > > > behaviour just increase performance) > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > In my opinion the default setting for each > operator > > > > > should > > > > > > be > > > > > > >>> > > > > > immutability > > > > > > >>> > > > > > > and the user could override this manually if > he/she > > > > > wants. > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > What do you think? > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > Regards, > > > > > > >>> > > > > > > Gyula > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > >