Gabor and Greg gave some good comments on the proposal.

If there is no more feedback, I'll go ahead and open a PR to update the
documentation tomorrow.

Thanks, Fabian

2016-02-24 12:24 GMT+01:00 Fabian Hueske <>:

> Regarding the scope of the object-reuse setting, I agree with Greg.
> It would be very nice if we could specify the object-reuse mode for each
> user function.
> Greg, do you want to open a JIRA for that such that we can continue the
> discussion there?
> 2016-02-24 12:07 GMT+01:00 Fabian Hueske <>:
>> Hi everybody,
>> thanks for your input.
>> I sketched a proposal for updated object-reuse semantics and
>> documentation, based on Gabor's proposal (1), Greg's input, and the changed
>> semantics that I discussed earlier in this thread.
>> -->
>> Looking forward to your comments.
>> Fabian
>> (1)
>> 2016-02-20 13:04 GMT+01:00 Gábor Gévay <>:
>>> Thanks, Ken! I was wondering how other systems handle these issues.
>>> Fortunately, the deep copy - shallow copy problem doesn't arise in
>>> Flink: when we copy an object, it is always a deep copy (at least, I
>>> hope so :)).
>>> Best,
>>> Gábor
>>> 2016-02-19 22:29 GMT+01:00 Ken Krugler <>:
>>> > Not sure how useful this is, but we'd run into similar issues with
>>> Cascading over the years.
>>> >
>>> > This wasn't an issue for input data, as Cascading "locks" the Tuple
>>> such that attempts to modify it will fail.
>>> >
>>> > And in general Hadoop always re-uses the data container being passed
>>> to operations, so you quickly learn to not cache those :)
>>> >
>>> > When trying to re-use a Tuple as the output in an operation, things
>>> get a bit more complicated.
>>> >
>>> > If the Tuple only contains primitive types, then there's no issue as
>>> the (effectively) shallow copy created by the execution platform doesn't
>>> create a problem.
>>> >
>>> > If the Tuple contains an object (e.g. a nested Tuple) then there were
>>> situations where a deep copy would need to be made before passing the Tuple
>>> to the operation's output collector.
>>> >
>>> > For example, if the next (chained) operation was a map-side
>>> aggregator, then a shallow copy of the Tuple would be cached. If there's a
>>> non-primitive object then changes to this in the upstream operation
>>> obviously bork the cached data.
>>> >
>>> > Net-net is that it we wanted a way to find out, from inside an
>>> operation, whether we needed to make a deep copy of the output Tuple. But
>>> that doesn't exist (yet), so we have some utility code to check if a deep
>>> copy is needed (non-primitive types), and if so then it auto-clones the
>>> Tuple. Which isn't very efficient, but for most of our workflows we only
>>> have primitive types.
>>> >
>>> > -- Ken
>>> >
>>> >> From: Fabian Hueske
>>> >> Sent: February 17, 2016 9:17:27am PST
>>> >> To:
>>> >> Subject: Guarantees for object reuse modes and documentation
>>> >>
>>> >> Hi,
>>> >>
>>> >>
>>> >>
>>> >> Flink's DataSet API features a configuration parameter called
>>> >> enableObjectReuse(). If activated, Flink's runtime will create fewer
>>> >> objects which results in better performance and lower garbage
>>> collection
>>> >> overhead. Depending on whether the configuration switch is enabled or
>>> not,
>>> >> user functions may or may not perform certain operations on objects
>>> they
>>> >> receive from Flink or emit to Flink.
>>> >>
>>> >>
>>> >>
>>> >> At the moment, there are quite a few open issues and discussions
>>> going on
>>> >> about the object reuse mode, including the JIRA issues FLINK-3333,
>>> >> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>>> >>
>>> >>
>>> >>
>>> >> IMO, the most important issue is FLINK-3333 which is about improving
>>> the
>>> >> documentation of the object reuse mode. The current version [1] is
>>> >> ambiguous and includes details about operator chaining which are hard
>>> to
>>> >> understand and to reason about for users. Hence it is not very clear
>>> which
>>> >> guarantees Flink gives for objects in user functions under which
>>> >> conditions. This documentation needs to be improved and I think this
>>> should
>>> >> happen together with the 1.0 release.
>>> >>
>>> >>
>>> >>
>>> >> Greg and Gabor proposed two new versions:
>>> >>
>>> >> 1. Greg's version [2]  improves and clarifies the current
>>> documentation
>>> >> without significantly changing the semantics. It also discusses
>>> operator
>>> >> chaining, but gives more details.
>>> >> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>>> >> independent of operator chaining which I think is a very good idea
>>> because
>>> >> it is not transparent to the user when function chaining happens.
>>> Gabor
>>> >> formulated four questions to answer what users can do with and expect
>>> from
>>> >> objects that they received or emitted from a function. In order to
>>> make the
>>> >> answers to these questions independent of function chaining and still
>>> keep
>>> >> the contracts as defined by the current documentation, we have to
>>> default
>>> >> to rather restrictive rules. For instance, functions must always emit
>>> new
>>> >> object instances in case of disabled object reuse mode. These strict
>>> rules
>>> >> would for example also require DataSourceFunctions to copy all records
>>> >> which they receive from an InputFormat (see FLINK-3335). IMO, the
>>> strict
>>> >> guarantees make the disableObjectReuse mode harder to use and reason
>>> about
>>> >> than the enableObjectReuse mode whereas the opposite should be the
>>> case.
>>> >>
>>> >>
>>> >>
>>> >> I would like to suggest a third option. Similar as Gabor, I think the
>>> rules
>>> >> should be independent of function chaining and I would like to break
>>> it
>>> >> down into a handful of easy rules. However, I think we should loosen
>>> up the
>>> >> guarantees for user functions under disableObjectReuse mode a bit.
>>> >>
>>> >> Right now, the documentation states that under enableObjectReuse mode,
>>> >> input objects are not changed across functions calls. Hence users can
>>> >> remember these objects across functions calls and their value will not
>>> >> change. I propose to give this guarantee only within functions calls
>>> and
>>> >> only for objects which are not emitted. Hence, this rule only applies
>>> for
>>> >> functions that can consume multiple values through an iterator such as
>>> >> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse
>>> mode,
>>> >> these functions are allowed to remember the values e.g., in a
>>> collection,
>>> >> and their value will not change when the iterator is forwarded. Once
>>> the
>>> >> function call returns, the values might change. Since  functions with
>>> >> iterators cannot be directly chained, it will be safe to emit the same
>>> >> object instance several times (hence FLINK-3335 would become invalid).
>>> >>
>>> >>
>>> >>
>>> >> The difference to the current guarantees is that input objects become
>>> >> invalid after the function call returned. Since, the
>>> disableObjectReuse
>>> >> mode was mainly introduced to allow for caching objects across
>>> iterator
>>> >> calls within a GroupReduceFunction or CoGroupFunction (not across
>>> function
>>> >> calls), I think this is a reasonable restriction.
>>> >>
>>> >>
>>> >>
>>> >> tl;dr;
>>> >>
>>> >> If we want to make the documentation of object reuse independent of
>>> >> chaining we have to
>>> >>
>>> >> - EITHER, give tighter guarantees / be more restrictive than now and
>>> update
>>> >> internals which might lead to performance regression. This would be
>>> in-line
>>> >> with the current documentation but somewhat defeat the purpose of the
>>> >> disabledObjectReuse mode, IMO.
>>> >>
>>> >> - OR, give weaker guarantees, which breaks with the current
>>> documentation,
>>> >> but would not affect performance or be easier to follow for users,
>>> IMO.
>>> >>
>>> >>
>>> >> Greg and Gabor, please correct me if I did not get your points right
>>> or
>>> >> missed something.
>>> >>
>>> >> What do others think?
>>> >>
>>> >>
>>> >> Fabian
>>> >>
>>> >>
>>> >>
>>> >> [1]
>>> >>
>>> >>
>>> >> [2]
>>> >>
>>> >>
>>> >> [3]
>>> >>
>>> >
>>> > --------------------------
>>> > Ken Krugler
>>> > +1 530-210-6378
>>> >
>>> > custom big data solutions & training
>>> > Hadoop, Cascading, Cassandra & Solr
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --------------------------
>>> > Ken Krugler
>>> > +1 530-210-6378
>>> >
>>> > custom big data solutions & training
>>> > Hadoop, Cascading, Cassandra & Solr
>>> >
>>> >
>>> >
>>> >
>>> >

Reply via email to