Thanks for the write up and illustrations. :-) +1 to do this.

I'm fine with both proposed "changed behaviour" variants, but lean
towards option 1: change the default, make the change explicit in the
release notes and add a good docs page about configuring object reuse
(ideally re-using your illustrations from the FLIP).

I see that option 2 (keep COPY_PER_OPERATOR as default for upgraded
jobs if nothing else is configured) is nice in order to prevent any
surprises for users upgrading from 1.3 to 1.4. But if I understand it
correctly we only postpone the problem to their first 1.4 savepoint +
restore at which point the behaviour would still change, right? If the
answer is yes, I think that this might be more confusing than simply
changing the default (option 1) in the long run.

– Ufuk


On Sun, Jul 2, 2017 at 6:12 PM, Stephan Ewen <se...@apache.org> wrote:
> Thank you for the reply and for the support!
>
> @Greg, controlling object reuse on a per-operator base is definitely a good
> way to follow up. My first thought would be to keep this proposal slim and
> deal with the "default" logic, and have a followup effort to make this
> controllable per operator.
>
> @Greg When you mention the "surprises" about object reuse in the DataSet
> API, what cases and behavior do you have in mind there?
>
> Stephan
>
>
> On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> +1 for changing the default if so many people encountered problems with
>> serialisation costs.
>>
>> The first two modes don’t require any code changes, correct? Only the last
>> one would require changes to the stream input processors.
>>
>> We should also keep this issue in mind: https://issues.apache.org/
>> jira/browse/FLINK-3974 <https://issues.apache.org/jira/browse/FLINK-3974>
>> i.e. we always need to make shallow copies of the StreamRecord.
>>
>> Best,
>> Aljoscha
>>
>> > On 27. Jun 2017, at 21:01, Zhenzhong Xu <flyf...@gmail.com> wrote:
>> >
>> > Stephan,
>> >
>> > Fully supporting this FLIP. We originally encountered pretty big
>> surprises observing the object copy behavior causing significant
>> performance degradation for our massively parallel use case.
>> >
>> > In our case, (I think most appropriately SHOULD be the assumptions for
>> all streaming use case), is to assume object immutability for all the
>> records throughout processing pipeline, as a result, I don't see a need to
>> distinguish different object reuse behaviors for different (chained)
>> operators (or to the very extreme even the need to support
>> COPY_PER_OPERATOR other than we probably have to support for backward
>> compatibility). I am also a fan of failing fast if user asserts incorrect
>> assumptions.
>> >
>> > One feedback on the FLIP-21 itself, I am not very clear on the
>> difference between DEFAULT and FULL_REUSE enumeration, aren't them exactly
>> the same thing in new proposal? However, the model figures seem to indicate
>> they are slightly different? Can you elaborate a bit more?
>> >
>> > Z.
>> >
>> >
>> > On 2017-06-27 11:14 (-0700), Greg Hogan <c...@greghogan.com <mailto:
>> c...@greghogan.com>> wrote:
>> >> Hi Stephan,
>> >>
>> >> Would this be an appropriate time to discuss allowing reuse to be a
>> per-operator configuration? Object reuse for chained operators has lead to
>> considerable surprise for some users of the DataSet API. This came up
>> during the rework of the object reuse documentation for the DataSet API.
>> With annotations a Function could mark whether input/iterator or
>> output/collected objects should be copied or reused.
>> >>
>> >> My distant observation is that is is safer to locally assert reuse at
>> the operator level than to assume or guarantee the safety of object reuse
>> across an entire program. It could also be handy to mix operators receiving
>> copyable objects with operators not requiring copyable objects.
>> >>
>> >> Greg
>> >>
>> >>
>> >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
>> >>>
>> >>> Hi all!
>> >>>
>> >>> I would like to propose the following FLIP:
>> >>>
>> >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
>> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982
>> >>>
>> >>> The FLIP is motivated by the fact that many users run into an
>> unnecessary
>> >>> kind of performance problem caused by an old design artifact.
>> >>>
>> >>> The required change should be reasonably small, and would help many
>> users
>> >>> and Flink's general standing.
>> >>>
>> >>> Happy to hear thoughts!
>> >>>
>> >>> Stephan
>> >>>
>> >>> ======================================
>> >>>
>> >>> FLIP text is below. Pictures with illustrations are only in the Wiki,
>> not
>> >>> supported on the mailing list.
>> >>> ------------------------------------------------------------
>> -------------------------------------
>> >>>
>> >>> Motivation
>> >>>
>> >>> The default behavior of the streaming runtime is to copy every element
>> >>> between chained operators.
>> >>>
>> >>> That operation was introduced for “safety† reasons, to avoid the
>> number of
>> >>> cases where users can create incorrect programs by reusing mutable
>> objects
>> >>> (a discouraged pattern, but possible). For example when using state
>> >>> backends that keep the state as objects on heap, reusing mutable
>> objects
>> >>> can theoretically create cases where the same object is used in
>> multiple
>> >>> state mappings.
>> >>>
>> >>> The effect is that many people that try Flink get much lower
>> performance
>> >>> than they could possibly get. From empirical evidence, almost all users
>> >>> that I (Stephan) have been in touch with eventually run into this issue
>> >>> eventually.
>> >>>
>> >>> There are multiple observations about that design:
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  Object copies are extremely costly. While some simple copy virtually
>> for
>> >>>  free (types reliably detected as immutable are not copied at all),
>> many
>> >>>  real pipelines use types like Avro, Thrift, JSON, etc, which are very
>> >>>  expensive to copy.
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  Keyed operations currently only occur after shuffles. The operations
>> are
>> >>>  hence the first in a pipeline and will never have a reused object
>> anyways.
>> >>>  That means for the most critical operation, this pre-caution is
>> unnecessary.
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  The mode is inconsistent with the contract of the DataSet API, which
>> >>>  does not copy at each step
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  To prevent these copies, users can select {{enableObjectReuse()}},
>> which
>> >>>  is misleading, since it does not really reuse mutable objects, but
>> avoids
>> >>>  additional copies.
>> >>>
>> >>>
>> >>> Proposal
>> >>>
>> >>> Summary
>> >>>
>> >>> I propose to change the default behavior of the DataStream runtime to
>> be
>> >>> the same as the DataSet runtime. That means that new objects are
>> chosen on
>> >>> every deserialization, and no copies are made as the objects are
>> passed on
>> >>> along the pipelines.
>> >>>
>> >>> Details
>> >>>
>> >>> I propose to drop the execution config flag {{objectReuse}} and instead
>> >>> introduce an {{ObjectReuseMode}} enumeration with better control of
>> what
>> >>> should happen. There will be three different types:
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  DEFAULT
>> >>>  -
>> >>>
>> >>>     This is the default in the DataSet API
>> >>>     -
>> >>>
>> >>>     This will become the default in the DataStream API
>> >>>     -
>> >>>
>> >>>     This happens in the DataStream API when {{enableObjectReuse()}} is
>> >>>     activated.
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  COPY_PER_OPERATOR
>> >>>  -
>> >>>
>> >>>     The current default in the DataStream API
>> >>>
>> >>>
>> >>>
>> >>>  -
>> >>>
>> >>>  FULL_REUSE
>> >>>  -
>> >>>
>> >>>     This happens in the DataSet API when {{enableObjectReuse()}} is
>> >>>     chosen.
>> >>>
>> >>>
>> >>> An illustration of the modes is as follows:
>> >>>
>> >>> DEFAULT
>> >>>
>> >>>
>> >>> See here:
>> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
>> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-
>> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&;
>> preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_
>> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-
>> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r>
>> >>>
>> >>> COPY_PER_OPERATOR
>> >>>
>> >>>
>> >>> See here:
>> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%
>> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK
>> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-
>> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/
>> confluence/pages/viewpage.action?pageId=71012982&
>> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-
>> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-
>> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks>
>> >>>
>> >>>
>> >>> FULL_REUSE
>> >>>
>> >>>
>> >>> See here:
>> >>> https://cwiki.apache.org/confluence/pages/viewpage.
>> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%
>> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
>> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE <
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&;
>> preview=/https%3A%2F%2Flh5.googleusercontent.com%
>> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF
>> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE>
>> >>> New or Changed Public Interfaces
>> >>>
>> >>> Interfaces changed
>> >>>
>> >>> The interface of the {{ExecutionConfig}} add the method
>> >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
>> >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}.
>> >>>
>> >>>
>> >>> Behavior changed
>> >>>
>> >>> The default object passing behavior changes, meaning that it can
>> affect the
>> >>> correctness of prior DataStream programs that assume the original
>> >>> “COPY_PER_OPERATOR† behavior.
>> >>>
>> >>> Migration Plan and Compatibility
>> >>>
>> >>> Interfaces
>> >>>
>> >>> No interface migration path is needed, because the interfaces are not
>> >>> broken, merely some methods get deprecated.
>> >>>
>> >>> Behavior change
>> >>>
>> >>> Variant 1:
>> >>>
>> >>>  -
>> >>>
>> >>>  Change the behavior, make it explicit on the release notes that we did
>> >>>  that and what cases are affected.
>> >>>  -
>> >>>
>> >>>  This may actually be feasible, because the cases that are affected are
>> >>>  quite pathological corner cases that only very bad implementations
>> should
>> >>>  encounter (see below)
>> >>>
>> >>>
>> >>> Variant 2:
>> >>>
>> >>>  -
>> >>>
>> >>>  When users set the mode, always that mode is used.
>> >>>  -
>> >>>
>> >>>  When the mode is not explicitly set, we follow that strategy:
>> >>>  -
>> >>>
>> >>>     Change the CLI such that we know when users upgrade existing jobs
>> >>>     (the savepoint to start from has a version prior to 1.4).
>> >>>     -
>> >>>
>> >>>     Use DEFAULT as the default for jobs that do not start from
>> savepoint,
>> >>>     or that start from savepoint >= 1.4
>> >>>     -
>> >>>
>> >>>     Use COPY_PER_OPERATOR as the default for upgraded jobs
>>
>>

Reply via email to