Hi everybody, thanks for your input.
I sketched a proposal for updated object-reuse semantics and documentation, based on Gabor's proposal (1), Greg's input, and the changed semantics that I discussed earlier in this thread. --> https://docs.google.com/document/d/1jpPr2UuWlqq1iIDIo_1kmPL9QjA-sXAC9wkj-hE4PAc/edit# Looking forward to your comments. Fabian (1) https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit 2016-02-20 13:04 GMT+01:00 Gábor Gévay <gga...@gmail.com>: > Thanks, Ken! I was wondering how other systems handle these issues. > > Fortunately, the deep copy - shallow copy problem doesn't arise in > Flink: when we copy an object, it is always a deep copy (at least, I > hope so :)). > > Best, > Gábor > > > > 2016-02-19 22:29 GMT+01:00 Ken Krugler <kkrugler_li...@transpac.com>: > > Not sure how useful this is, but we'd run into similar issues with > Cascading over the years. > > > > This wasn't an issue for input data, as Cascading "locks" the Tuple such > that attempts to modify it will fail. > > > > And in general Hadoop always re-uses the data container being passed to > operations, so you quickly learn to not cache those :) > > > > When trying to re-use a Tuple as the output in an operation, things get > a bit more complicated. > > > > If the Tuple only contains primitive types, then there's no issue as the > (effectively) shallow copy created by the execution platform doesn't create > a problem. > > > > If the Tuple contains an object (e.g. a nested Tuple) then there were > situations where a deep copy would need to be made before passing the Tuple > to the operation's output collector. > > > > For example, if the next (chained) operation was a map-side aggregator, > then a shallow copy of the Tuple would be cached. If there's a > non-primitive object then changes to this in the upstream operation > obviously bork the cached data. > > > > Net-net is that it we wanted a way to find out, from inside an > operation, whether we needed to make a deep copy of the output Tuple. But > that doesn't exist (yet), so we have some utility code to check if a deep > copy is needed (non-primitive types), and if so then it auto-clones the > Tuple. Which isn't very efficient, but for most of our workflows we only > have primitive types. > > > > -- Ken > > > >> From: Fabian Hueske > >> Sent: February 17, 2016 9:17:27am PST > >> To: dev@flink.apache.org > >> Subject: Guarantees for object reuse modes and documentation > >> > >> Hi, > >> > >> > >> > >> Flink's DataSet API features a configuration parameter called > >> enableObjectReuse(). If activated, Flink's runtime will create fewer > >> objects which results in better performance and lower garbage collection > >> overhead. Depending on whether the configuration switch is enabled or > not, > >> user functions may or may not perform certain operations on objects they > >> receive from Flink or emit to Flink. > >> > >> > >> > >> At the moment, there are quite a few open issues and discussions going > on > >> about the object reuse mode, including the JIRA issues FLINK-3333, > >> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291. > >> > >> > >> > >> IMO, the most important issue is FLINK-3333 which is about improving the > >> documentation of the object reuse mode. The current version [1] is > >> ambiguous and includes details about operator chaining which are hard to > >> understand and to reason about for users. Hence it is not very clear > which > >> guarantees Flink gives for objects in user functions under which > >> conditions. This documentation needs to be improved and I think this > should > >> happen together with the 1.0 release. > >> > >> > >> > >> Greg and Gabor proposed two new versions: > >> > >> 1. Greg's version [2] improves and clarifies the current documentation > >> without significantly changing the semantics. It also discusses operator > >> chaining, but gives more details. > >> 2. Gabor's proposal [3] aims to make the discussion of object reuse > >> independent of operator chaining which I think is a very good idea > because > >> it is not transparent to the user when function chaining happens. Gabor > >> formulated four questions to answer what users can do with and expect > from > >> objects that they received or emitted from a function. In order to make > the > >> answers to these questions independent of function chaining and still > keep > >> the contracts as defined by the current documentation, we have to > default > >> to rather restrictive rules. For instance, functions must always emit > new > >> object instances in case of disabled object reuse mode. These strict > rules > >> would for example also require DataSourceFunctions to copy all records > >> which they receive from an InputFormat (see FLINK-3335). IMO, the strict > >> guarantees make the disableObjectReuse mode harder to use and reason > about > >> than the enableObjectReuse mode whereas the opposite should be the case. > >> > >> > >> > >> I would like to suggest a third option. Similar as Gabor, I think the > rules > >> should be independent of function chaining and I would like to break it > >> down into a handful of easy rules. However, I think we should loosen up > the > >> guarantees for user functions under disableObjectReuse mode a bit. > >> > >> Right now, the documentation states that under enableObjectReuse mode, > >> input objects are not changed across functions calls. Hence users can > >> remember these objects across functions calls and their value will not > >> change. I propose to give this guarantee only within functions calls and > >> only for objects which are not emitted. Hence, this rule only applies > for > >> functions that can consume multiple values through an iterator such as > >> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse > mode, > >> these functions are allowed to remember the values e.g., in a > collection, > >> and their value will not change when the iterator is forwarded. Once the > >> function call returns, the values might change. Since functions with > >> iterators cannot be directly chained, it will be safe to emit the same > >> object instance several times (hence FLINK-3335 would become invalid). > >> > >> > >> > >> The difference to the current guarantees is that input objects become > >> invalid after the function call returned. Since, the disableObjectReuse > >> mode was mainly introduced to allow for caching objects across iterator > >> calls within a GroupReduceFunction or CoGroupFunction (not across > function > >> calls), I think this is a reasonable restriction. > >> > >> > >> > >> tl;dr; > >> > >> If we want to make the documentation of object reuse independent of > >> chaining we have to > >> > >> - EITHER, give tighter guarantees / be more restrictive than now and > update > >> internals which might lead to performance regression. This would be > in-line > >> with the current documentation but somewhat defeat the purpose of the > >> disabledObjectReuse mode, IMO. > >> > >> - OR, give weaker guarantees, which breaks with the current > documentation, > >> but would not affect performance or be easier to follow for users, IMO. > >> > >> > >> Greg and Gabor, please correct me if I did not get your points right or > >> missed something. > >> > >> What do others think? > >> > >> > >> Fabian > >> > >> > >> > >> [1] > >> > https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior > >> > >> [2] > >> > https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151 > >> > >> [3] > >> > https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg > > > > -------------------------- > > Ken Krugler > > +1 530-210-6378 > > http://www.scaleunlimited.com > > custom big data solutions & training > > Hadoop, Cascading, Cassandra & Solr > > > > > > > > > > > > -------------------------- > > Ken Krugler > > +1 530-210-6378 > > http://www.scaleunlimited.com > > custom big data solutions & training > > Hadoop, Cascading, Cassandra & Solr > > > > > > > > > > >