Thanks for the write up and illustrations. :-) +1 to do this. I'm fine with both proposed "changed behaviour" variants, but lean towards option 1: change the default, make the change explicit in the release notes and add a good docs page about configuring object reuse (ideally re-using your illustrations from the FLIP).
I see that option 2 (keep COPY_PER_OPERATOR as default for upgraded jobs if nothing else is configured) is nice in order to prevent any surprises for users upgrading from 1.3 to 1.4. But if I understand it correctly we only postpone the problem to their first 1.4 savepoint + restore at which point the behaviour would still change, right? If the answer is yes, I think that this might be more confusing than simply changing the default (option 1) in the long run. – Ufuk On Sun, Jul 2, 2017 at 6:12 PM, Stephan Ewen <se...@apache.org> wrote: > Thank you for the reply and for the support! > > @Greg, controlling object reuse on a per-operator base is definitely a good > way to follow up. My first thought would be to keep this proposal slim and > deal with the "default" logic, and have a followup effort to make this > controllable per operator. > > @Greg When you mention the "surprises" about object reuse in the DataSet > API, what cases and behavior do you have in mind there? > > Stephan > > > On Wed, Jun 28, 2017 at 2:56 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> +1 for changing the default if so many people encountered problems with >> serialisation costs. >> >> The first two modes don’t require any code changes, correct? Only the last >> one would require changes to the stream input processors. >> >> We should also keep this issue in mind: https://issues.apache.org/ >> jira/browse/FLINK-3974 <https://issues.apache.org/jira/browse/FLINK-3974> >> i.e. we always need to make shallow copies of the StreamRecord. >> >> Best, >> Aljoscha >> >> > On 27. Jun 2017, at 21:01, Zhenzhong Xu <flyf...@gmail.com> wrote: >> > >> > Stephan, >> > >> > Fully supporting this FLIP. We originally encountered pretty big >> surprises observing the object copy behavior causing significant >> performance degradation for our massively parallel use case. >> > >> > In our case, (I think most appropriately SHOULD be the assumptions for >> all streaming use case), is to assume object immutability for all the >> records throughout processing pipeline, as a result, I don't see a need to >> distinguish different object reuse behaviors for different (chained) >> operators (or to the very extreme even the need to support >> COPY_PER_OPERATOR other than we probably have to support for backward >> compatibility). I am also a fan of failing fast if user asserts incorrect >> assumptions. >> > >> > One feedback on the FLIP-21 itself, I am not very clear on the >> difference between DEFAULT and FULL_REUSE enumeration, aren't them exactly >> the same thing in new proposal? However, the model figures seem to indicate >> they are slightly different? Can you elaborate a bit more? >> > >> > Z. >> > >> > >> > On 2017-06-27 11:14 (-0700), Greg Hogan <c...@greghogan.com <mailto: >> c...@greghogan.com>> wrote: >> >> Hi Stephan, >> >> >> >> Would this be an appropriate time to discuss allowing reuse to be a >> per-operator configuration? Object reuse for chained operators has lead to >> considerable surprise for some users of the DataSet API. This came up >> during the rework of the object reuse documentation for the DataSet API. >> With annotations a Function could mark whether input/iterator or >> output/collected objects should be copied or reused. >> >> >> >> My distant observation is that is is safer to locally assert reuse at >> the operator level than to assume or guarantee the safety of object reuse >> across an entire program. It could also be handy to mix operators receiving >> copyable objects with operators not requiring copyable objects. >> >> >> >> Greg >> >> >> >> >> >>> On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote: >> >>> >> >>> Hi all! >> >>> >> >>> I would like to propose the following FLIP: >> >>> >> >>> FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime: >> >>> https://cwiki.apache.org/confluence/pages/viewpage. >> action?pageId=71012982 >> >>> >> >>> The FLIP is motivated by the fact that many users run into an >> unnecessary >> >>> kind of performance problem caused by an old design artifact. >> >>> >> >>> The required change should be reasonably small, and would help many >> users >> >>> and Flink's general standing. >> >>> >> >>> Happy to hear thoughts! >> >>> >> >>> Stephan >> >>> >> >>> ====================================== >> >>> >> >>> FLIP text is below. Pictures with illustrations are only in the Wiki, >> not >> >>> supported on the mailing list. >> >>> ------------------------------------------------------------ >> ------------------------------------- >> >>> >> >>> Motivation >> >>> >> >>> The default behavior of the streaming runtime is to copy every element >> >>> between chained operators. >> >>> >> >>> That operation was introduced for “safety†reasons, to avoid the >> number of >> >>> cases where users can create incorrect programs by reusing mutable >> objects >> >>> (a discouraged pattern, but possible). For example when using state >> >>> backends that keep the state as objects on heap, reusing mutable >> objects >> >>> can theoretically create cases where the same object is used in >> multiple >> >>> state mappings. >> >>> >> >>> The effect is that many people that try Flink get much lower >> performance >> >>> than they could possibly get. From empirical evidence, almost all users >> >>> that I (Stephan) have been in touch with eventually run into this issue >> >>> eventually. >> >>> >> >>> There are multiple observations about that design: >> >>> >> >>> >> >>> - >> >>> >> >>> Object copies are extremely costly. While some simple copy virtually >> for >> >>> free (types reliably detected as immutable are not copied at all), >> many >> >>> real pipelines use types like Avro, Thrift, JSON, etc, which are very >> >>> expensive to copy. >> >>> >> >>> >> >>> >> >>> - >> >>> >> >>> Keyed operations currently only occur after shuffles. The operations >> are >> >>> hence the first in a pipeline and will never have a reused object >> anyways. >> >>> That means for the most critical operation, this pre-caution is >> unnecessary. >> >>> >> >>> >> >>> >> >>> - >> >>> >> >>> The mode is inconsistent with the contract of the DataSet API, which >> >>> does not copy at each step >> >>> >> >>> >> >>> >> >>> - >> >>> >> >>> To prevent these copies, users can select {{enableObjectReuse()}}, >> which >> >>> is misleading, since it does not really reuse mutable objects, but >> avoids >> >>> additional copies. >> >>> >> >>> >> >>> Proposal >> >>> >> >>> Summary >> >>> >> >>> I propose to change the default behavior of the DataStream runtime to >> be >> >>> the same as the DataSet runtime. That means that new objects are >> chosen on >> >>> every deserialization, and no copies are made as the objects are >> passed on >> >>> along the pipelines. >> >>> >> >>> Details >> >>> >> >>> I propose to drop the execution config flag {{objectReuse}} and instead >> >>> introduce an {{ObjectReuseMode}} enumeration with better control of >> what >> >>> should happen. There will be three different types: >> >>> >> >>> >> >>> - >> >>> >> >>> DEFAULT >> >>> - >> >>> >> >>> This is the default in the DataSet API >> >>> - >> >>> >> >>> This will become the default in the DataStream API >> >>> - >> >>> >> >>> This happens in the DataStream API when {{enableObjectReuse()}} is >> >>> activated. >> >>> >> >>> >> >>> >> >>> - >> >>> >> >>> COPY_PER_OPERATOR >> >>> - >> >>> >> >>> The current default in the DataStream API >> >>> >> >>> >> >>> >> >>> - >> >>> >> >>> FULL_REUSE >> >>> - >> >>> >> >>> This happens in the DataSet API when {{enableObjectReuse()}} is >> >>> chosen. >> >>> >> >>> >> >>> An illustration of the modes is as follows: >> >>> >> >>> DEFAULT >> >>> >> >>> >> >>> See here: >> >>> https://cwiki.apache.org/confluence/pages/viewpage. >> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com% >> 2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF- >> xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r < >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982& >> preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_ >> mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd- >> Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r> >> >>> >> >>> COPY_PER_OPERATOR >> >>> >> >>> >> >>> See here: >> >>> https://cwiki.apache.org/confluence/pages/viewpage. >> action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com% >> 2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTK >> uQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f- >> iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks <https://cwiki.apache.org/ >> confluence/pages/viewpage.action?pageId=71012982& >> preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1- >> IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI- >> sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks> >> >>> >> >>> >> >>> FULL_REUSE >> >>> >> >>> >> >>> See here: >> >>> https://cwiki.apache.org/confluence/pages/viewpage. >> action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com% >> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF >> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE < >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982& >> preview=/https%3A%2F%2Flh5.googleusercontent.com% >> 2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuF >> GinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE> >> >>> New or Changed Public Interfaces >> >>> >> >>> Interfaces changed >> >>> >> >>> The interface of the {{ExecutionConfig}} add the method >> >>> {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods >> >>> {{enableObjectReuse()}} and {{disableObjectReuse()}}. >> >>> >> >>> >> >>> Behavior changed >> >>> >> >>> The default object passing behavior changes, meaning that it can >> affect the >> >>> correctness of prior DataStream programs that assume the original >> >>> “COPY_PER_OPERATOR†behavior. >> >>> >> >>> Migration Plan and Compatibility >> >>> >> >>> Interfaces >> >>> >> >>> No interface migration path is needed, because the interfaces are not >> >>> broken, merely some methods get deprecated. >> >>> >> >>> Behavior change >> >>> >> >>> Variant 1: >> >>> >> >>> - >> >>> >> >>> Change the behavior, make it explicit on the release notes that we did >> >>> that and what cases are affected. >> >>> - >> >>> >> >>> This may actually be feasible, because the cases that are affected are >> >>> quite pathological corner cases that only very bad implementations >> should >> >>> encounter (see below) >> >>> >> >>> >> >>> Variant 2: >> >>> >> >>> - >> >>> >> >>> When users set the mode, always that mode is used. >> >>> - >> >>> >> >>> When the mode is not explicitly set, we follow that strategy: >> >>> - >> >>> >> >>> Change the CLI such that we know when users upgrade existing jobs >> >>> (the savepoint to start from has a version prior to 1.4). >> >>> - >> >>> >> >>> Use DEFAULT as the default for jobs that do not start from >> savepoint, >> >>> or that start from savepoint >= 1.4 >> >>> - >> >>> >> >>> Use COPY_PER_OPERATOR as the default for upgraded jobs >> >>