Thank you for the reply and for the support!

@Greg, controlling object reuse on a per-operator base is definitely a good
way to follow up. My first thought would be to keep this proposal slim and
deal with the "default" logic, and have a followup effort to make this
controllable per operator.

@Greg When you mention the "surprises" about object reuse in the DataSet
API, what cases and behavior do you have in mind there?

Stephan


On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> +1 for changing the default if so many people encountered problems with
> serialisation costs.
>
> The first two modes don’t require any code changes, correct? Only the last
> one would require changes to the stream input processors.
>
> We should also keep this issue in mind: https://issues.apache.org/
> jira/browse/FLINK-3974 <https://issues.apache.org/jira/browse/FLINK-3974>
> i.e. we always need to make shallow copies of the StreamRecord.
>
> Best,
> Aljoscha
>
> > On 27. Jun 2017, at 21:01, Zhenzhong Xu <flyf...@gmail.com> wrote:
> >
> > Stephan,
> >
> > Fully supporting this FLIP. We originally encountered pretty big
> surprises observing the object copy behavior causing significant
> performance degradation for our massively parallel use case.
> >
> > In our case, (I think most appropriately SHOULD be the assumptions for
> all streaming use case), is to assume object immutability for all the
> records throughout processing pipeline, as a result, I don't see a need to
> distinguish different object reuse behaviors for different (chained)
> operators (or to the very extreme even the need to support
> COPY_PER_OPERATOR other than we probably have to support for backward
> compatibility). I am also a fan of failing fast if user asserts incorrect
> assumptions.
> >
> > One feedback on the FLIP-21 itself, I am not very clear on the
> difference between DEFAULT and FULL_REUSE enumeration, aren't them exactly
> the same thing in new proposal? However, the model figures seem to indicate
> they are slightly different? Can you elaborate a bit more?
> >
> > Z.
> >
> >
> > On 2017-06-27 11:14 (-0700), Greg Hogan <c...@greghogan.com <mailto:
> c...@greghogan.com>> wrote:
> >> Hi Stephan,
> >>
> >> Would this be an appropriate time to discuss allowing reuse to be a
> per-operator configuration? Object reuse for chained operators has lead to
> considerable surprise for some users of the DataSet API. This came up
> during the rework of the object reuse documentation for the DataSet API.
> With annotations a Function could mark whether input/iterator or
> output/collected objects should be copied or reused.
> >>
> >> My distant observation is that is is safer to locally assert reuse at
> the operator level than to assume or guarantee the safety of object reuse
> across an entire program. It could also be handy to mix operators receiving
> copyable objects with operators not requiring copyable objects.
> >>
> >> Greg
> >>
> >>
> >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
> >>>
> >>> Hi all!
> >>>
> >>> I would like to propose the following FLIP:
> >>>
> >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982
> >>>
> >>> The FLIP is motivated by the fact that many users run into an
> unnecessary
> >>> kind of performance problem caused by an old design artifact.
> >>>
> >>> The required change should be reasonably small, and would help many
> users
> >>> and Flink's general standing.
> >>>
> >>> Happy to hear thoughts!
> >>>
> >>> Stephan
> >>>
> >>> ======================================
> >>>
> >>> FLIP text is below. Pictures with illustrations are only in the Wiki,
> not
> >>> supported on the mailing list.
> >>> ------------------------------------------------------------
> -------------------------------------
> >>>
> >>> Motivation
> >>>
> >>> The default behavior of the streaming runtime is to copy every element
> >>> between chained operators.
> >>>
> >>> That operation was introduced for “safety† reasons, to avoid the
> number of
> >>> cases where users can create incorrect programs by reusing mutable
> objects
> >>> (a discouraged pattern, but possible). For example when using state
> >>> backends that keep the state as objects on heap, reusing mutable
> objects
> >>> can theoretically create cases where the same object is used in
> multiple
> >>> state mappings.
> >>>
> >>> The effect is that many people that try Flink get much lower
> performance
> >>> than they could possibly get. From empirical evidence, almost all users
> >>> that I (Stephan) have been in touch with eventually run into this issue
> >>> eventually.
> >>>
> >>> There are multiple observations about that design:
> >>>
> >>>
> >>>  -
> >>>
> >>>  Object copies are extremely costly. While some simple copy virtually
> for
> >>>  free (types reliably detected as immutable are not copied at all),
> many
> >>>  real pipelines use types like Avro, Thrift, JSON, etc, which are very
> >>>  expensive to copy.
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  Keyed operations currently only occur after shuffles. The operations
> are
> >>>  hence the first in a pipeline and will never have a reused object
> anyways.
> >>>  That means for the most critical operation, this pre-caution is
> unnecessary.
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  The mode is inconsistent with the contract of the DataSet API, which
> >>>  does not copy at each step
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  To prevent these copies, users can select {{enableObjectReuse()}},
> which
> >>>  is misleading, since it does not really reuse mutable objects, but
> avoids
> >>>  additional copies.
> >>>
> >>>
> >>> Proposal
> >>>
> >>> Summary
> >>>
> >>> I propose to change the default behavior of the DataStream runtime to
> be
> >>> the same as the DataSet runtime. That means that new objects are
> chosen on
> >>> every deserialization, and no copies are made as the objects are
> passed on
> >>> along the pipelines.
> >>>
> >>> Details
> >>>
> >>> I propose to drop the execution config flag {{objectReuse}} and instead
> >>> introduce an {{ObjectReuseMode}} enumeration with better control of
> what
> >>> should happen. There will be three different types:
> >>>
> >>>
> >>>  -
> >>>
> >>>  DEFAULT
> >>>  -
> >>>
> >>>     This is the default in the DataSet API
> >>>     -
> >>>
> >>>     This will become the default in the DataStream API
> >>>     -
> >>>
> >>>     This happens in the DataStream API when {{enableObjectReuse()}} is
> >>>     activated.
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  COPY_PER_OPERATOR
> >>>  -
> >>>
> >>>     The current default in the DataStream API
> >>>
> >>>
> >>>
> >>>  -
> >>>
> >>>  FULL_REUSE
> >>>  -
> >>>
> >>>     This happens in the DataSet API when {{enableObjectReuse()}} is
> >>>     chosen.
> >>>
> >>>
> >>> An illustration of the modes is as follows:
> >>>
> >>> DEFAULT
> >>>
> >>>
> >>> See here:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-
> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&;
> preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_
> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-
> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r>
> >>>
> >>> COPY_PER_OPERATOR
> >>>
> >>>
> >>> See here:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%
> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK
> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-
> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/
> confluence/pages/viewpage.action?pageId=71012982&
> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-
> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-
> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks>
> >>>
> >>>
> >>> FULL_REUSE
> >>>
> >>>
> >>> See here:
> >>> https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&;
> preview=/https%3A%2F%2Flh5.googleusercontent.com%
> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE>
> >>> New or Changed Public Interfaces
> >>>
> >>> Interfaces changed
> >>>
> >>> The interface of the {{ExecutionConfig}} add the method
> >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
> >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
> >>>
> >>>
> >>> Behavior changed
> >>>
> >>> The default object passing behavior changes, meaning that it can
> affect the
> >>> correctness of prior DataStream programs that assume the original
> >>> “COPY_PER_OPERATOR† behavior.
> >>>
> >>> Migration Plan and Compatibility
> >>>
> >>> Interfaces
> >>>
> >>> No interface migration path is needed, because the interfaces are not
> >>> broken, merely some methods get deprecated.
> >>>
> >>> Behavior change
> >>>
> >>> Variant 1:
> >>>
> >>>  -
> >>>
> >>>  Change the behavior, make it explicit on the release notes that we did
> >>>  that and what cases are affected.
> >>>  -
> >>>
> >>>  This may actually be feasible, because the cases that are affected are
> >>>  quite pathological corner cases that only very bad implementations
> should
> >>>  encounter (see below)
> >>>
> >>>
> >>> Variant 2:
> >>>
> >>>  -
> >>>
> >>>  When users set the mode, always that mode is used.
> >>>  -
> >>>
> >>>  When the mode is not explicitly set, we follow that strategy:
> >>>  -
> >>>
> >>>     Change the CLI such that we know when users upgrade existing jobs
> >>>     (the savepoint to start from has a version prior to 1.4).
> >>>     -
> >>>
> >>>     Use DEFAULT as the default for jobs that do not start from
> savepoint,
> >>>     or that start from savepoint >= 1.4
> >>>     -
> >>>
> >>>     Use COPY_PER_OPERATOR as the default for upgraded jobs
>
>

Reply via email to