Thanks, Ken! I was wondering how other systems handle these issues.

Fortunately, the deep copy - shallow copy problem doesn't arise in
Flink: when we copy an object, it is always a deep copy (at least, I
hope so :)).

Best,
Gábor



2016-02-19 22:29 GMT+01:00 Ken Krugler <kkrugler_li...@transpac.com>:
> Not sure how useful this is, but we'd run into similar issues with Cascading 
> over the years.
>
> This wasn't an issue for input data, as Cascading "locks" the Tuple such that 
> attempts to modify it will fail.
>
> And in general Hadoop always re-uses the data container being passed to 
> operations, so you quickly learn to not cache those :)
>
> When trying to re-use a Tuple as the output in an operation, things get a bit 
> more complicated.
>
> If the Tuple only contains primitive types, then there's no issue as the 
> (effectively) shallow copy created by the execution platform doesn't create a 
> problem.
>
> If the Tuple contains an object (e.g. a nested Tuple) then there were 
> situations where a deep copy would need to be made before passing the Tuple 
> to the operation's output collector.
>
> For example, if the next (chained) operation was a map-side aggregator, then 
> a shallow copy of the Tuple would be cached. If there's a non-primitive 
> object then changes to this in the upstream operation obviously bork the 
> cached data.
>
> Net-net is that it we wanted a way to find out, from inside an operation, 
> whether we needed to make a deep copy of the output Tuple. But that doesn't 
> exist (yet), so we have some utility code to check if a deep copy is needed 
> (non-primitive types), and if so then it auto-clones the Tuple. Which isn't 
> very efficient, but for most of our workflows we only have primitive types.
>
> -- Ken
>
>> From: Fabian Hueske
>> Sent: February 17, 2016 9:17:27am PST
>> To: dev@flink.apache.org
>> Subject: Guarantees for object reuse modes and documentation
>>
>> Hi,
>>
>>
>>
>> Flink's DataSet API features a configuration parameter called
>> enableObjectReuse(). If activated, Flink's runtime will create fewer
>> objects which results in better performance and lower garbage collection
>> overhead. Depending on whether the configuration switch is enabled or not,
>> user functions may or may not perform certain operations on objects they
>> receive from Flink or emit to Flink.
>>
>>
>>
>> At the moment, there are quite a few open issues and discussions going on
>> about the object reuse mode, including the JIRA issues FLINK-3333,
>> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>>
>>
>>
>> IMO, the most important issue is FLINK-3333 which is about improving the
>> documentation of the object reuse mode. The current version [1] is
>> ambiguous and includes details about operator chaining which are hard to
>> understand and to reason about for users. Hence it is not very clear which
>> guarantees Flink gives for objects in user functions under which
>> conditions. This documentation needs to be improved and I think this should
>> happen together with the 1.0 release.
>>
>>
>>
>> Greg and Gabor proposed two new versions:
>>
>> 1. Greg's version [2]  improves and clarifies the current documentation
>> without significantly changing the semantics. It also discusses operator
>> chaining, but gives more details.
>> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>> independent of operator chaining which I think is a very good idea because
>> it is not transparent to the user when function chaining happens. Gabor
>> formulated four questions to answer what users can do with and expect from
>> objects that they received or emitted from a function. In order to make the
>> answers to these questions independent of function chaining and still keep
>> the contracts as defined by the current documentation, we have to default
>> to rather restrictive rules. For instance, functions must always emit new
>> object instances in case of disabled object reuse mode. These strict rules
>> would for example also require DataSourceFunctions to copy all records
>> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
>> guarantees make the disableObjectReuse mode harder to use and reason about
>> than the enableObjectReuse mode whereas the opposite should be the case.
>>
>>
>>
>> I would like to suggest a third option. Similar as Gabor, I think the rules
>> should be independent of function chaining and I would like to break it
>> down into a handful of easy rules. However, I think we should loosen up the
>> guarantees for user functions under disableObjectReuse mode a bit.
>>
>> Right now, the documentation states that under enableObjectReuse mode,
>> input objects are not changed across functions calls. Hence users can
>> remember these objects across functions calls and their value will not
>> change. I propose to give this guarantee only within functions calls and
>> only for objects which are not emitted. Hence, this rule only applies for
>> functions that can consume multiple values through an iterator such as
>> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
>> these functions are allowed to remember the values e.g., in a collection,
>> and their value will not change when the iterator is forwarded. Once the
>> function call returns, the values might change. Since  functions with
>> iterators cannot be directly chained, it will be safe to emit the same
>> object instance several times (hence FLINK-3335 would become invalid).
>>
>>
>>
>> The difference to the current guarantees is that input objects become
>> invalid after the function call returned. Since, the disableObjectReuse
>> mode was mainly introduced to allow for caching objects across iterator
>> calls within a GroupReduceFunction or CoGroupFunction (not across function
>> calls), I think this is a reasonable restriction.
>>
>>
>>
>> tl;dr;
>>
>> If we want to make the documentation of object reuse independent of
>> chaining we have to
>>
>> - EITHER, give tighter guarantees / be more restrictive than now and update
>> internals which might lead to performance regression. This would be in-line
>> with the current documentation but somewhat defeat the purpose of the
>> disabledObjectReuse mode, IMO.
>>
>> - OR, give weaker guarantees, which breaks with the current documentation,
>> but would not affect performance or be easier to follow for users, IMO.
>>
>>
>> Greg and Gabor, please correct me if I did not get your points right or
>> missed something.
>>
>> What do others think?
>>
>>
>> Fabian
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>>
>> [2]
>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>>
>> [3]
>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>

Reply via email to