It does not mean we have to behave the same way, it is just an indication that well-defined behavior can allow you to mess things up.
The question is now what is the default mode: - Failsafe/Heavy (always copy) - Performance/Lightweight (do not copy) On Wed, May 20, 2015 at 3:29 PM, Stephan Ewen <se...@apache.org> wrote: > This is something that we can clearly define as "should not be done". > Systems do that. > I think if you repeatedly emit (or mutate) the same object for example in > Spark, you get an RDD with completely messed up contents. > > On Wed, May 20, 2015 at 3:27 PM, Gyula Fóra <gyf...@apache.org> wrote: > >> If the preceding operator is emitting a mutated object, or does something >> with the output object afterwards then its a problem. >> >> Emitting the same object is a special case of this. >> >> On Wed, May 20, 2015 at 3:09 PM, Stephan Ewen <se...@apache.org> wrote: >> >> > The case you are making is if a preceding operator in a chain is >> repeatedly >> > emitting the same object, and the succeeding operator is gathering the >> > objects, then it is a problem >> > >> > Or are there cases where the system itself repeatedly emits the same >> > objects? >> > >> > On Wed, May 20, 2015 at 3:07 PM, Gyula Fóra <gyf...@apache.org> wrote: >> > >> > > We are designing a system for stateful stream computations, assuming >> long >> > > standing operators that gather and store data as the stream evolves >> > (unlike >> > > in the dataset api). Many programs, like windowing, sampling etc hold >> the >> > > state in the form of past data. And without careful understanding of >> the >> > > runtime these programs will break or have unnecessary copies. >> > > >> > > This is why I think immutability should be the default so we can have >> a >> > > clear dataflow model with immutable streams. >> > > >> > > I see absolutely no reason why we cant have the non-copy version as an >> > > optional setting for the users. >> > > >> > > >> > > On Wed, May 20, 2015 at 2:21 PM, Paris Carbone <par...@kth.se> wrote: >> > > >> > > > @stephan I see your point. If we assume that operators do not hold >> > > > references in their state to any transmitted records it works fine. >> We >> > > > therefore need to make this clear to the users. I need to check if >> that >> > > > would break semantics in SAMOA or other integrations as well that >> > assume >> > > > immutability. For example in SAMOA there are often local metric >> objects >> > > > that are being constantly mutated and simply forwarded periodically >> to >> > > > other (possibly chained) operators that need to evaluate them. >> > > > >> > > > ________________________________________ >> > > > From: Gyula Fóra <gyf...@apache.org> >> > > > Sent: Wednesday, May 20, 2015 2:06 PM >> > > > To: dev@flink.apache.org >> > > > Subject: Re: [DISCUSS] Re-add record copy to chained operator calls >> > > > >> > > > "Copy before putting it into a window buffer and any other group >> > buffer." >> > > > >> > > > Exactly my point. Any stateful operator should be able to implement >> > > > something like this without having to worry about copying the object >> > (and >> > > > at this point the user would need to know whether it comes from the >> > > network >> > > > to avoid unnecessary copies), so I don't agree with leaving the copy >> > off. >> > > > >> > > > The user can of course specify that the operator is mutable if he >> wants >> > > > (and he is worried about the performance), But I still think the >> > default >> > > > behaviour should be immutable. >> > > > We cannot force users to not hold object references and also it is a >> > > quite >> > > > unnatural way of programming in a language like java. >> > > > >> > > > >> > > > On Wed, May 20, 2015 at 1:39 PM, Stephan Ewen <se...@apache.org> >> > wrote: >> > > > >> > > > > I am curious why the copying is actually needed. >> > > > > >> > > > > In the batch API, we chain and do not copy and it is rather >> > > predictable. >> > > > > >> > > > > The cornerpoints of that design is to follow these rules: >> > > > > >> > > > > 1) Objects read from the network or any buffer are always new >> > objects. >> > > > > That comes naturally when they are deserialized as part of that >> (all >> > > > > buffers store serialized) >> > > > > >> > > > > 2) After a function returned a record (or gives one to the >> > collector), >> > > > it >> > > > > if given to the chain of chained operators, but after it is >> through >> > the >> > > > > chain, no one else holds a reference to that object. >> > > > > For that, it is crucial that objects are not stored by >> > reference, >> > > > but >> > > > > either stored serialized, or a copy is stored. >> > > > > >> > > > > This is quite solid in the batch API. How about we follow the same >> > > > paradigm >> > > > > in the streaming API. We would need to adjust the following: >> > > > > >> > > > > 1) Do not copy between operators (I think this is the case right >> now) >> > > > > >> > > > > 2) Copy before putting it into a window buffer and any other group >> > > > buffer. >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > On Wed, May 20, 2015 at 1:22 PM, Aljoscha Krettek < >> > aljos...@apache.org >> > > > >> > > > > wrote: >> > > > > >> > > > > > Yes, in fact I anticipated this. There is one central place >> where >> > we >> > > > > > can insert a copy step, in OperatorCollector in OutputHandler. >> > > > > > >> > > > > > On Wed, May 20, 2015 at 11:17 AM, Paris Carbone <par...@kth.se> >> > > wrote: >> > > > > > > I guess it was not intended ^^. >> > > > > > > >> > > > > > > Chaining should be transparent and not break the >> correct/expected >> > > > > > behaviour. >> > > > > > > >> > > > > > > >> > > > > > > Paris? >> > > > > > > >> > > > > > > On 20 May 2015, at 11:02, Márton Balassi <mbala...@apache.org >> > >> > > > wrote: >> > > > > > > >> > > > > > > +1 for copying. >> > > > > > > On May 20, 2015 10:50 AM, "Gyula Fóra" <gyf...@apache.org> >> > wrote: >> > > > > > > >> > > > > > > Hey, >> > > > > > > >> > > > > > > The latest streaming operator rework removed the copying of >> the >> > > > outputs >> > > > > > > before passing them to chained operators. This is a major >> break >> > for >> > > > the >> > > > > > > previous operator semantics which guaranteed immutability. >> > > > > > > >> > > > > > > I think this change leads to very indeterministic program >> > behaviour >> > > > > from >> > > > > > > the user's perspective as only non-chained outputs/inputs >> will be >> > > > > > mutable. >> > > > > > > If we allow this to happen, users will start disabling >> chaining >> > to >> > > > get >> > > > > > > immutability which defeats the purpose. (chaining should not >> > affect >> > > > > > program >> > > > > > > behaviour just increase performance) >> > > > > > > >> > > > > > > In my opinion the default setting for each operator should be >> > > > > > immutability >> > > > > > > and the user could override this manually if he/she wants. >> > > > > > > >> > > > > > > What do you think? >> > > > > > > >> > > > > > > Regards, >> > > > > > > Gyula >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >