Hi,


Flink's DataSet API features a configuration parameter called
enableObjectReuse(). If activated, Flink's runtime will create fewer
objects which results in better performance and lower garbage collection
overhead. Depending on whether the configuration switch is enabled or not,
user functions may or may not perform certain operations on objects they
receive from Flink or emit to Flink.



At the moment, there are quite a few open issues and discussions going on
about the object reuse mode, including the JIRA issues FLINK-3333,
FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.



IMO, the most important issue is FLINK-3333 which is about improving the
documentation of the object reuse mode. The current version [1] is
ambiguous and includes details about operator chaining which are hard to
understand and to reason about for users. Hence it is not very clear which
guarantees Flink gives for objects in user functions under which
conditions. This documentation needs to be improved and I think this should
happen together with the 1.0 release.



Greg and Gabor proposed two new versions:

1. Greg's version [2]  improves and clarifies the current documentation
without significantly changing the semantics. It also discusses operator
chaining, but gives more details.
2. Gabor's proposal [3] aims to make the discussion of object reuse
independent of operator chaining which I think is a very good idea because
it is not transparent to the user when function chaining happens. Gabor
formulated four questions to answer what users can do with and expect from
objects that they received or emitted from a function. In order to make the
answers to these questions independent of function chaining and still keep
the contracts as defined by the current documentation, we have to default
to rather restrictive rules. For instance, functions must always emit new
object instances in case of disabled object reuse mode. These strict rules
would for example also require DataSourceFunctions to copy all records
which they receive from an InputFormat (see FLINK-3335). IMO, the strict
guarantees make the disableObjectReuse mode harder to use and reason about
than the enableObjectReuse mode whereas the opposite should be the case.



I would like to suggest a third option. Similar as Gabor, I think the rules
should be independent of function chaining and I would like to break it
down into a handful of easy rules. However, I think we should loosen up the
guarantees for user functions under disableObjectReuse mode a bit.

Right now, the documentation states that under enableObjectReuse mode,
input objects are not changed across functions calls. Hence users can
remember these objects across functions calls and their value will not
change. I propose to give this guarantee only within functions calls and
only for objects which are not emitted. Hence, this rule only applies for
functions that can consume multiple values through an iterator such as
GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
these functions are allowed to remember the values e.g., in a collection,
and their value will not change when the iterator is forwarded. Once the
function call returns, the values might change. Since  functions with
iterators cannot be directly chained, it will be safe to emit the same
object instance several times (hence FLINK-3335 would become invalid).



The difference to the current guarantees is that input objects become
invalid after the function call returned. Since, the disableObjectReuse
mode was mainly introduced to allow for caching objects across iterator
calls within a GroupReduceFunction or CoGroupFunction (not across function
calls), I think this is a reasonable restriction.



tl;dr;

If we want to make the documentation of object reuse independent of
chaining we have to

- EITHER, give tighter guarantees / be more restrictive than now and update
internals which might lead to performance regression. This would be in-line
with the current documentation but somewhat defeat the purpose of the
disabledObjectReuse mode, IMO.

- OR, give weaker guarantees, which breaks with the current documentation,
but would not affect performance or be easier to follow for users, IMO.


Greg and Gabor, please correct me if I did not get your points right or
missed something.

What do others think?


Fabian



[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior

[2]
https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151

[3]
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg

Reply via email to