Thank you for the reply and for the support! @Greg, controlling object reuse on a per-operator base is definitely a good way to follow up. My first thought would be to keep this proposal slim and deal with the "default" logic, and have a followup effort to make this controllable per operator.
@Greg When you mention the "surprises" about object reuse in the DataSet API, what cases and behavior do you have in mind there? Stephan On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > +1 for changing the default if so many people encountered problems with > serialisation costs. > > The first two modes don’t require any code changes, correct? Only the last > one would require changes to the stream input processors. > > We should also keep this issue in mind: https://issues.apache.org/ > jira/browse/FLINK-3974 <https://issues.apache.org/jira/browse/FLINK-3974> > i.e. we always need to make shallow copies of the StreamRecord. > > Best, > Aljoscha > > > On 27. Jun 2017, at 21:01, Zhenzhong Xu <flyf...@gmail.com> wrote: > > > > Stephan, > > > > Fully supporting this FLIP. We originally encountered pretty big > surprises observing the object copy behavior causing significant > performance degradation for our massively parallel use case. > > > > In our case, (I think most appropriately SHOULD be the assumptions for > all streaming use case), is to assume object immutability for all the > records throughout processing pipeline, as a result, I don't see a need to > distinguish different object reuse behaviors for different (chained) > operators (or to the very extreme even the need to support > COPY_PER_OPERATOR other than we probably have to support for backward > compatibility). I am also a fan of failing fast if user asserts incorrect > assumptions. > > > > One feedback on the FLIP-21 itself, I am not very clear on the > difference between DEFAULT and FULL_REUSE enumeration, aren't them exactly > the same thing in new proposal? However, the model figures seem to indicate > they are slightly different? Can you elaborate a bit more? > > > > Z. > > > > > > On 2017-06-27 11:14 (-0700), Greg Hogan <c...@greghogan.com <mailto: > c...@greghogan.com>> wrote: > >> Hi Stephan, > >> > >> Would this be an appropriate time to discuss allowing reuse to be a > per-operator configuration? Object reuse for chained operators has lead to > considerable surprise for some users of the DataSet API. This came up > during the rework of the object reuse documentation for the DataSet API. > With annotations a Function could mark whether input/iterator or > output/collected objects should be copied or reused. > >> > >> My distant observation is that is is safer to locally assert reuse at > the operator level than to assume or guarantee the safety of object reuse > across an entire program. It could also be handy to mix operators receiving > copyable objects with operators not requiring copyable objects. > >> > >> Greg > >> > >> > >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote: > >>> > >>> Hi all! > >>> > >>> I would like to propose the following FLIP: > >>> > >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime: > >>> https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=71012982 > >>> > >>> The FLIP is motivated by the fact that many users run into an > unnecessary > >>> kind of performance problem caused by an old design artifact. > >>> > >>> The required change should be reasonably small, and would help many > users > >>> and Flink's general standing. > >>> > >>> Happy to hear thoughts! > >>> > >>> Stephan > >>> > >>> ====================================== > >>> > >>> FLIP text is below. Pictures with illustrations are only in the Wiki, > not > >>> supported on the mailing list. > >>> ------------------------------------------------------------ > ------------------------------------- > >>> > >>> Motivation > >>> > >>> The default behavior of the streaming runtime is to copy every element > >>> between chained operators. > >>> > >>> That operation was introduced for “safety†reasons, to avoid the > number of > >>> cases where users can create incorrect programs by reusing mutable > objects > >>> (a discouraged pattern, but possible). For example when using state > >>> backends that keep the state as objects on heap, reusing mutable > objects > >>> can theoretically create cases where the same object is used in > multiple > >>> state mappings. > >>> > >>> The effect is that many people that try Flink get much lower > performance > >>> than they could possibly get. From empirical evidence, almost all users > >>> that I (Stephan) have been in touch with eventually run into this issue > >>> eventually. > >>> > >>> There are multiple observations about that design: > >>> > >>> > >>> - > >>> > >>> Object copies are extremely costly. While some simple copy virtually > for > >>> free (types reliably detected as immutable are not copied at all), > many > >>> real pipelines use types like Avro, Thrift, JSON, etc, which are very > >>> expensive to copy. > >>> > >>> > >>> > >>> - > >>> > >>> Keyed operations currently only occur after shuffles. The operations > are > >>> hence the first in a pipeline and will never have a reused object > anyways. > >>> That means for the most critical operation, this pre-caution is > unnecessary. > >>> > >>> > >>> > >>> - > >>> > >>> The mode is inconsistent with the contract of the DataSet API, which > >>> does not copy at each step > >>> > >>> > >>> > >>> - > >>> > >>> To prevent these copies, users can select {{enableObjectReuse()}}, > which > >>> is misleading, since it does not really reuse mutable objects, but > avoids > >>> additional copies. > >>> > >>> > >>> Proposal > >>> > >>> Summary > >>> > >>> I propose to change the default behavior of the DataStream runtime to > be > >>> the same as the DataSet runtime. That means that new objects are > chosen on > >>> every deserialization, and no copies are made as the objects are > passed on > >>> along the pipelines. > >>> > >>> Details > >>> > >>> I propose to drop the execution config flag {{objectReuse}} and instead > >>> introduce an {{ObjectReuseMode}} enumeration with better control of > what > >>> should happen. There will be three different types: > >>> > >>> > >>> - > >>> > >>> DEFAULT > >>> - > >>> > >>> This is the default in the DataSet API > >>> - > >>> > >>> This will become the default in the DataStream API > >>> - > >>> > >>> This happens in the DataStream API when {{enableObjectReuse()}} is > >>> activated. > >>> > >>> > >>> > >>> - > >>> > >>> COPY_PER_OPERATOR > >>> - > >>> > >>> The current default in the DataStream API > >>> > >>> > >>> > >>> - > >>> > >>> FULL_REUSE > >>> - > >>> > >>> This happens in the DataSet API when {{enableObjectReuse()}} is > >>> chosen. > >>> > >>> > >>> An illustration of the modes is as follows: > >>> > >>> DEFAULT > >>> > >>> > >>> See here: > >>> https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com% > 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF- > xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r < > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982& > preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_ > mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd- > Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r> > >>> > >>> COPY_PER_OPERATOR > >>> > >>> > >>> See here: > >>> https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com% > 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK > uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f- > iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/ > confluence/pages/viewpage.action?pageId=71012982& > preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1- > IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI- > sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks> > >>> > >>> > >>> FULL_REUSE > >>> > >>> > >>> See here: > >>> https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com% > 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF > GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE < > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982& > preview=/https%3A%2F%2Flh5.googleusercontent.com% > 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF > GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE> > >>> New or Changed Public Interfaces > >>> > >>> Interfaces changed > >>> > >>> The interface of the {{ExecutionConfig}} add the method > >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods > >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}. > >>> > >>> > >>> Behavior changed > >>> > >>> The default object passing behavior changes, meaning that it can > affect the > >>> correctness of prior DataStream programs that assume the original > >>> “COPY_PER_OPERATOR†behavior. > >>> > >>> Migration Plan and Compatibility > >>> > >>> Interfaces > >>> > >>> No interface migration path is needed, because the interfaces are not > >>> broken, merely some methods get deprecated. > >>> > >>> Behavior change > >>> > >>> Variant 1: > >>> > >>> - > >>> > >>> Change the behavior, make it explicit on the release notes that we did > >>> that and what cases are affected. > >>> - > >>> > >>> This may actually be feasible, because the cases that are affected are > >>> quite pathological corner cases that only very bad implementations > should > >>> encounter (see below) > >>> > >>> > >>> Variant 2: > >>> > >>> - > >>> > >>> When users set the mode, always that mode is used. > >>> - > >>> > >>> When the mode is not explicitly set, we follow that strategy: > >>> - > >>> > >>> Change the CLI such that we know when users upgrade existing jobs > >>> (the savepoint to start from has a version prior to 1.4). > >>> - > >>> > >>> Use DEFAULT as the default for jobs that do not start from > savepoint, > >>> or that start from savepoint >= 1.4 > >>> - > >>> > >>> Use COPY_PER_OPERATOR as the default for upgraded jobs > >