I think it is fair to say that everything that Flink has in its core provides immutability. The mutability effect comes only if the user starts mutating objects across functions.
The overhead will depend vastly on whether you are sending smaller records or large records. I see you are very keen on the failsafe variant. That is fine, I'd say let's go ahead. Then let us introduce a switch. The switch needs to work on copies for user functions only. Until the window buffers are serialized, we need to keep the copies there. On Wed, May 20, 2015 at 3:55 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > I know it is nicer to have no-copy from a performance perspective, but a > dataflow system with no immutability guarantee is something very hard to > describe. > > Systems like Storm and Google Dataflow have immutablility guarantees I > think for the same reason to provide very clear, easy to use semantics. > > On Wed, May 20, 2015 at 3:45 PM, Stephan Ewen <se...@apache.org> wrote: > > > A vote is the last resort. Consensus through discussion is much nicer. > And > > I think we are making progress. > > > > We went for the lightweight version in the batch API, because > > - there are few cases that are affected (only functions with side effect > > state) > > - you can always switch lightweight -> failsafe in the future (only > > hardens guarantees), but not the other way around (dropping guarantees) > > without breaking existing code. > > > > If you are strong on that point, I do not want to be a blocker for this. > I > > only ask to make a well informed decision, as this behavior is as much > part > > of the API as the classname of the DataStream. > > > > > > On Wed, May 20, 2015 at 3:41 PM, Gyula Fóra <gyula.f...@gmail.com> > wrote: > > > > > I would go for the Failsafe option as a default behaviour with a > clearly > > > documented lightweight (no-copy) setting, but I think having a Vote on > > this > > > would be the proper way of settling this question. > > > > > > On Wed, May 20, 2015 at 3:37 PM, Aljoscha Krettek <aljos...@apache.org > > > > > wrote: > > > > > > > I think that in the long run (maybe not too long) we will have to > > > > change our stateful operators (windows, basically) to use managed > > > > memory and spill to disk. (Think jobs that have sliding windows over > > > > days or weeks) Then then the internal operators will take care of > > > > copying anyways. The problem Gyula mentioned we cannot tackle other > > > > than by defining how user code must behave. > > > > > > > > On Wed, May 20, 2015 at 3:30 PM, Stephan Ewen <se...@apache.org> > > wrote: > > > > > It does not mean we have to behave the same way, it is just an > > > indication > > > > > that well-defined behavior can allow you to mess things up. > > > > > > > > > > The question is now what is the default mode: > > > > > - Failsafe/Heavy (always copy) > > > > > - Performance/Lightweight (do not copy) > > > > > > > > > > > > > > > On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org> > > > wrote: > > > > > > > > > >> This is something that we can clearly define as "should not be > > done". > > > > >> Systems do that. > > > > >> I think if you repeatedly emit (or mutate) the same object for > > example > > > > in > > > > >> Spark, you get an RDD with completely messed up contents. > > > > >> > > > > >> On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gyf...@apache.org> > > > wrote: > > > > >> > > > > >>> If the preceding operator is emitting a mutated object, or does > > > > something > > > > >>> with the output object afterwards then its a problem. > > > > >>> > > > > >>> Emitting the same object is a special case of this. > > > > >>> > > > > >>> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org> > > > > wrote: > > > > >>> > > > > >>> > The case you are making is if a preceding operator in a chain > is > > > > >>> repeatedly > > > > >>> > emitting the same object, and the succeeding operator is > > gathering > > > > the > > > > >>> > objects, then it is a problem > > > > >>> > > > > > >>> > Or are there cases where the system itself repeatedly emits the > > > same > > > > >>> > objects? > > > > >>> > > > > > >>> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gyf...@apache.org > > > > > > wrote: > > > > >>> > > > > > >>> > > We are designing a system for stateful stream computations, > > > > assuming > > > > >>> long > > > > >>> > > standing operators that gather and store data as the stream > > > evolves > > > > >>> > (unlike > > > > >>> > > in the dataset api). Many programs, like windowing, sampling > > etc > > > > hold > > > > >>> the > > > > >>> > > state in the form of past data. And without careful > > understanding > > > > of > > > > >>> the > > > > >>> > > runtime these programs will break or have unnecessary copies. > > > > >>> > > > > > > >>> > > This is why I think immutability should be the default so we > > can > > > > have > > > > >>> a > > > > >>> > > clear dataflow model with immutable streams. > > > > >>> > > > > > > >>> > > I see absolutely no reason why we cant have the non-copy > > version > > > > as an > > > > >>> > > optional setting for the users. > > > > >>> > > > > > > >>> > > > > > > >>> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone < > par...@kth.se> > > > > wrote: > > > > >>> > > > > > > >>> > > > @stephan I see your point. If we assume that operators do > not > > > > hold > > > > >>> > > > references in their state to any transmitted records it > works > > > > fine. > > > > >>> We > > > > >>> > > > therefore need to make this clear to the users. I need to > > check > > > > if > > > > >>> that > > > > >>> > > > would break semantics in SAMOA or other integrations as > well > > > that > > > > >>> > assume > > > > >>> > > > immutability. For example in SAMOA there are often local > > metric > > > > >>> objects > > > > >>> > > > that are being constantly mutated and simply forwarded > > > > periodically > > > > >>> to > > > > >>> > > > other (possibly chained) operators that need to evaluate > > them. > > > > >>> > > > > > > > >>> > > > ________________________________________ > > > > >>> > > > From: Gyula Fóra <gyf...@apache.org> > > > > >>> > > > Sent: Wednesday, May 20, 2015 2:06 PM > > > > >>> > > > To: dev@flink.apache.org > > > > >>> > > > Subject: Re: [DISCUSS] Re-add record copy to chained > operator > > > > calls > > > > >>> > > > > > > > >>> > > > "Copy before putting it into a window buffer and any other > > > group > > > > >>> > buffer." > > > > >>> > > > > > > > >>> > > > Exactly my point. Any stateful operator should be able to > > > > implement > > > > >>> > > > something like this without having to worry about copying > the > > > > object > > > > >>> > (and > > > > >>> > > > at this point the user would need to know whether it comes > > from > > > > the > > > > >>> > > network > > > > >>> > > > to avoid unnecessary copies), so I don't agree with leaving > > the > > > > copy > > > > >>> > off. > > > > >>> > > > > > > > >>> > > > The user can of course specify that the operator is mutable > > if > > > he > > > > >>> wants > > > > >>> > > > (and he is worried about the performance), But I still > think > > > the > > > > >>> > default > > > > >>> > > > behaviour should be immutable. > > > > >>> > > > We cannot force users to not hold object references and > also > > it > > > > is a > > > > >>> > > quite > > > > >>> > > > unnatural way of programming in a language like java. > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen < > > > se...@apache.org> > > > > >>> > wrote: > > > > >>> > > > > > > > >>> > > > > I am curious why the copying is actually needed. > > > > >>> > > > > > > > > >>> > > > > In the batch API, we chain and do not copy and it is > rather > > > > >>> > > predictable. > > > > >>> > > > > > > > > >>> > > > > The cornerpoints of that design is to follow these rules: > > > > >>> > > > > > > > > >>> > > > > 1) Objects read from the network or any buffer are > always > > > new > > > > >>> > objects. > > > > >>> > > > > That comes naturally when they are deserialized as part > of > > > that > > > > >>> (all > > > > >>> > > > > buffers store serialized) > > > > >>> > > > > > > > > >>> > > > > 2) After a function returned a record (or gives one to > the > > > > >>> > collector), > > > > >>> > > > it > > > > >>> > > > > if given to the chain of chained operators, but after it > is > > > > >>> through > > > > >>> > the > > > > >>> > > > > chain, no one else holds a reference to that object. > > > > >>> > > > > For that, it is crucial that objects are not stored > by > > > > >>> > reference, > > > > >>> > > > but > > > > >>> > > > > either stored serialized, or a copy is stored. > > > > >>> > > > > > > > > >>> > > > > This is quite solid in the batch API. How about we follow > > the > > > > same > > > > >>> > > > paradigm > > > > >>> > > > > in the streaming API. We would need to adjust the > > following: > > > > >>> > > > > > > > > >>> > > > > 1) Do not copy between operators (I think this is the > case > > > > right > > > > >>> now) > > > > >>> > > > > > > > > >>> > > > > 2) Copy before putting it into a window buffer and any > > other > > > > group > > > > >>> > > > buffer. > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek < > > > > >>> > aljos...@apache.org > > > > >>> > > > > > > > >>> > > > > wrote: > > > > >>> > > > > > > > > >>> > > > > > Yes, in fact I anticipated this. There is one central > > place > > > > >>> where > > > > >>> > we > > > > >>> > > > > > can insert a copy step, in OperatorCollector in > > > > OutputHandler. > > > > >>> > > > > > > > > > >>> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone < > > > > par...@kth.se> > > > > >>> > > wrote: > > > > >>> > > > > > > I guess it was not intended ^^. > > > > >>> > > > > > > > > > > >>> > > > > > > Chaining should be transparent and not break the > > > > >>> correct/expected > > > > >>> > > > > > behaviour. > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > Paris? > > > > >>> > > > > > > > > > > >>> > > > > > > On 20 May 2015, at 11:02, Márton Balassi < > > > > mbala...@apache.org > > > > >>> > > > > > >>> > > > wrote: > > > > >>> > > > > > > > > > > >>> > > > > > > +1 for copying. > > > > >>> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" < > > > gyf...@apache.org> > > > > >>> > wrote: > > > > >>> > > > > > > > > > > >>> > > > > > > Hey, > > > > >>> > > > > > > > > > > >>> > > > > > > The latest streaming operator rework removed the > > copying > > > of > > > > >>> the > > > > >>> > > > outputs > > > > >>> > > > > > > before passing them to chained operators. This is a > > major > > > > >>> break > > > > >>> > for > > > > >>> > > > the > > > > >>> > > > > > > previous operator semantics which guaranteed > > > immutability. > > > > >>> > > > > > > > > > > >>> > > > > > > I think this change leads to very indeterministic > > program > > > > >>> > behaviour > > > > >>> > > > > from > > > > >>> > > > > > > the user's perspective as only non-chained > > outputs/inputs > > > > >>> will be > > > > >>> > > > > > mutable. > > > > >>> > > > > > > If we allow this to happen, users will start > disabling > > > > >>> chaining > > > > >>> > to > > > > >>> > > > get > > > > >>> > > > > > > immutability which defeats the purpose. (chaining > > should > > > > not > > > > >>> > affect > > > > >>> > > > > > program > > > > >>> > > > > > > behaviour just increase performance) > > > > >>> > > > > > > > > > > >>> > > > > > > In my opinion the default setting for each operator > > > should > > > > be > > > > >>> > > > > > immutability > > > > >>> > > > > > > and the user could override this manually if he/she > > > wants. > > > > >>> > > > > > > > > > > >>> > > > > > > What do you think? > > > > >>> > > > > > > > > > > >>> > > > > > > Regards, > > > > >>> > > > > > > Gyula > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >> > > > > >> > > > > > > > > > >