Regarding the scope of the object-reuse setting, I agree with Greg.
It would be very nice if we could specify the object-reuse mode for each
user function.

Greg, do you want to open a JIRA for that such that we can continue the
discussion there?

2016-02-24 12:07 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:

> Hi everybody,
>
> thanks for your input.
>
> I sketched a proposal for updated object-reuse semantics and
> documentation, based on Gabor's proposal (1), Greg's input, and the changed
> semantics that I discussed earlier in this thread.
>
> -->
> https://docs.google.com/document/d/1jpPr2UuWlqq1iIDIo_1kmPL9QjA-sXAC9wkj-hE4PAc/edit#
>
> Looking forward to your comments.
>
> Fabian
> (1)
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit
>
>
> 2016-02-20 13:04 GMT+01:00 Gábor Gévay <gga...@gmail.com>:
>
>> Thanks, Ken! I was wondering how other systems handle these issues.
>>
>> Fortunately, the deep copy - shallow copy problem doesn't arise in
>> Flink: when we copy an object, it is always a deep copy (at least, I
>> hope so :)).
>>
>> Best,
>> Gábor
>>
>>
>>
>> 2016-02-19 22:29 GMT+01:00 Ken Krugler <kkrugler_li...@transpac.com>:
>> > Not sure how useful this is, but we'd run into similar issues with
>> Cascading over the years.
>> >
>> > This wasn't an issue for input data, as Cascading "locks" the Tuple
>> such that attempts to modify it will fail.
>> >
>> > And in general Hadoop always re-uses the data container being passed to
>> operations, so you quickly learn to not cache those :)
>> >
>> > When trying to re-use a Tuple as the output in an operation, things get
>> a bit more complicated.
>> >
>> > If the Tuple only contains primitive types, then there's no issue as
>> the (effectively) shallow copy created by the execution platform doesn't
>> create a problem.
>> >
>> > If the Tuple contains an object (e.g. a nested Tuple) then there were
>> situations where a deep copy would need to be made before passing the Tuple
>> to the operation's output collector.
>> >
>> > For example, if the next (chained) operation was a map-side aggregator,
>> then a shallow copy of the Tuple would be cached. If there's a
>> non-primitive object then changes to this in the upstream operation
>> obviously bork the cached data.
>> >
>> > Net-net is that it we wanted a way to find out, from inside an
>> operation, whether we needed to make a deep copy of the output Tuple. But
>> that doesn't exist (yet), so we have some utility code to check if a deep
>> copy is needed (non-primitive types), and if so then it auto-clones the
>> Tuple. Which isn't very efficient, but for most of our workflows we only
>> have primitive types.
>> >
>> > -- Ken
>> >
>> >> From: Fabian Hueske
>> >> Sent: February 17, 2016 9:17:27am PST
>> >> To: dev@flink.apache.org
>> >> Subject: Guarantees for object reuse modes and documentation
>> >>
>> >> Hi,
>> >>
>> >>
>> >>
>> >> Flink's DataSet API features a configuration parameter called
>> >> enableObjectReuse(). If activated, Flink's runtime will create fewer
>> >> objects which results in better performance and lower garbage
>> collection
>> >> overhead. Depending on whether the configuration switch is enabled or
>> not,
>> >> user functions may or may not perform certain operations on objects
>> they
>> >> receive from Flink or emit to Flink.
>> >>
>> >>
>> >>
>> >> At the moment, there are quite a few open issues and discussions going
>> on
>> >> about the object reuse mode, including the JIRA issues FLINK-3333,
>> >> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>> >>
>> >>
>> >>
>> >> IMO, the most important issue is FLINK-3333 which is about improving
>> the
>> >> documentation of the object reuse mode. The current version [1] is
>> >> ambiguous and includes details about operator chaining which are hard
>> to
>> >> understand and to reason about for users. Hence it is not very clear
>> which
>> >> guarantees Flink gives for objects in user functions under which
>> >> conditions. This documentation needs to be improved and I think this
>> should
>> >> happen together with the 1.0 release.
>> >>
>> >>
>> >>
>> >> Greg and Gabor proposed two new versions:
>> >>
>> >> 1. Greg's version [2]  improves and clarifies the current documentation
>> >> without significantly changing the semantics. It also discusses
>> operator
>> >> chaining, but gives more details.
>> >> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>> >> independent of operator chaining which I think is a very good idea
>> because
>> >> it is not transparent to the user when function chaining happens. Gabor
>> >> formulated four questions to answer what users can do with and expect
>> from
>> >> objects that they received or emitted from a function. In order to
>> make the
>> >> answers to these questions independent of function chaining and still
>> keep
>> >> the contracts as defined by the current documentation, we have to
>> default
>> >> to rather restrictive rules. For instance, functions must always emit
>> new
>> >> object instances in case of disabled object reuse mode. These strict
>> rules
>> >> would for example also require DataSourceFunctions to copy all records
>> >> which they receive from an InputFormat (see FLINK-3335). IMO, the
>> strict
>> >> guarantees make the disableObjectReuse mode harder to use and reason
>> about
>> >> than the enableObjectReuse mode whereas the opposite should be the
>> case.
>> >>
>> >>
>> >>
>> >> I would like to suggest a third option. Similar as Gabor, I think the
>> rules
>> >> should be independent of function chaining and I would like to break it
>> >> down into a handful of easy rules. However, I think we should loosen
>> up the
>> >> guarantees for user functions under disableObjectReuse mode a bit.
>> >>
>> >> Right now, the documentation states that under enableObjectReuse mode,
>> >> input objects are not changed across functions calls. Hence users can
>> >> remember these objects across functions calls and their value will not
>> >> change. I propose to give this guarantee only within functions calls
>> and
>> >> only for objects which are not emitted. Hence, this rule only applies
>> for
>> >> functions that can consume multiple values through an iterator such as
>> >> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse
>> mode,
>> >> these functions are allowed to remember the values e.g., in a
>> collection,
>> >> and their value will not change when the iterator is forwarded. Once
>> the
>> >> function call returns, the values might change. Since  functions with
>> >> iterators cannot be directly chained, it will be safe to emit the same
>> >> object instance several times (hence FLINK-3335 would become invalid).
>> >>
>> >>
>> >>
>> >> The difference to the current guarantees is that input objects become
>> >> invalid after the function call returned. Since, the disableObjectReuse
>> >> mode was mainly introduced to allow for caching objects across iterator
>> >> calls within a GroupReduceFunction or CoGroupFunction (not across
>> function
>> >> calls), I think this is a reasonable restriction.
>> >>
>> >>
>> >>
>> >> tl;dr;
>> >>
>> >> If we want to make the documentation of object reuse independent of
>> >> chaining we have to
>> >>
>> >> - EITHER, give tighter guarantees / be more restrictive than now and
>> update
>> >> internals which might lead to performance regression. This would be
>> in-line
>> >> with the current documentation but somewhat defeat the purpose of the
>> >> disabledObjectReuse mode, IMO.
>> >>
>> >> - OR, give weaker guarantees, which breaks with the current
>> documentation,
>> >> but would not affect performance or be easier to follow for users, IMO.
>> >>
>> >>
>> >> Greg and Gabor, please correct me if I did not get your points right or
>> >> missed something.
>> >>
>> >> What do others think?
>> >>
>> >>
>> >> Fabian
>> >>
>> >>
>> >>
>> >> [1]
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>> >>
>> >> [2]
>> >>
>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>> >>
>> >> [3]
>> >>
>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
>> >
>> > --------------------------
>> > Ken Krugler
>> > +1 530-210-6378
>> > http://www.scaleunlimited.com
>> > custom big data solutions & training
>> > Hadoop, Cascading, Cassandra & Solr
>> >
>> >
>> >
>> >
>> >
>> > --------------------------
>> > Ken Krugler
>> > +1 530-210-6378
>> > http://www.scaleunlimited.com
>> > custom big data solutions & training
>> > Hadoop, Cascading, Cassandra & Solr
>> >
>> >
>> >
>> >
>> >
>>
>
>

Reply via email to