Hi, Why do you think that this means "much code changes"? I think it would actually be a pretty lightweight change in HeapReducingState.
The proposal is to copy the *first* value that goes into a ReducingState. The copy would be done by a TypeSerializer and hence be a deep copy. This will allow to reuse the copy in each invocation of the ReduceFunction instead of creating a new result object of the same type that was initially copied. I think the savings of reusing the object in each invocation of the ReduceFunction and not creating a new object should amortize the one-time object copy. Fabian 2016-11-23 3:04 GMT+01:00 sjk <shijinkui...@163.com>: > Hi, Fabian > > So much code changes. Can you show us the key changes code for the object > copy? > Object reference maybe hold more deep reference, it can be a bomb. > Can we renew a object with its data or direct use kryo for object > serialization? > I’m not prefer object copy. > > > > On Nov 22, 2016, at 20:33, Fabian Hueske <fhue...@gmail.com> wrote: > > > > Does anybody have objections against copying the first record that goes > > into the ReduceState? > > > > 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > > > >> That's right, yes. > >> > >> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fhue...@gmail.com> wrote: > >> > >>> Right, but that would be a much bigger change than "just" copying the > >>> *first* record that goes into the ReduceState, or am I missing > something? > >>> > >>> > >>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > >>> > >>>> To bring over my comment from the Github PR that started this > >> discussion: > >>>> > >>>> @wuchong <https://github.com/wuchong>, yes this is a problem with the > >>>> HeapStateBackend. The RocksDB backend does not suffer from this > >> problem. > >>> I > >>>> think in the long run we should migrate the HeapStateBackend to always > >>> keep > >>>> data in serialised form, then we also won't have this problem anymore. > >>>> > >>>> So I'm very much in favour of keeping data serialised. Copying data > >> would > >>>> only ever be a stopgap solution. > >>>> > >>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fhue...@gmail.com> wrote: > >>>> > >>>>> Another approach that would solve the problem for our use case > >> (object > >>>>> re-usage for incremental window ReduceFunctions) would be to copy the > >>>> first > >>>>> object that is put into the state. > >>>>> This would be a change on the ReduceState, not on the overall state > >>>>> backend, which should be feasible, no? > >>>>> > >>>>> > >>>>> > >>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>: > >>>>> > >>>>>> -1 for copying objects. > >>>>>> > >>>>>> Storing a serialized data where possible is good, but copying all > >>>> objects > >>>>>> by default is not a good idea, in my opinion. > >>>>>> A lot of scenarios use data types that are hellishly expensive to > >>> copy. > >>>>>> Even the current copy on chain handover is a problem. > >>>>>> > >>>>>> Let's not introduce even more copies. > >>>>>> > >>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <m...@touk.pl> > >>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> it will come with performance overhead when updating the state, > >>> but I > >>>>>>> think it'll be possible to perform asynchronous snapshots using > >>>>>>> HeapStateBackend (probably some changes to underlying data > >>> structures > >>>>>> would > >>>>>>> be needed) - which would bring more predictable performance. > >>>>>>> > >>>>>>> thanks, > >>>>>>> maciek > >>>>>>> > >>>>>>> > >>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> I would be in favour of this since it brings things in line with > >>> the > >>>>>>>> RocksDB backend. This will, however, come with quite the > >>> performance > >>>>>>>> overhead, depending on how fast the TypeSerializer can copy. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Aljoscha > >>>>>>>> > >>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com> > >>>> wrote: > >>>>>>>> > >>>>>>>> Hi everybody, > >>>>>>>>> > >>>>>>>>> when implementing a ReduceFunction for incremental aggregation > >> of > >>>>> SQL / > >>>>>>>>> Table API window aggregates we noticed that the > >> HeapStateBackend > >>>> does > >>>>>> not > >>>>>>>>> store copies but holds references to the original objects. In > >>> case > >>>>> of a > >>>>>>>>> SlidingWindow, the same object is referenced from different > >>> window > >>>>>> panes. > >>>>>>>>> Therefore, it is not possible to modify these objects (in order > >>> to > >>>>>> avoid > >>>>>>>>> object instantiations, see discussion [1]). > >>>>>>>>> > >>>>>>>>> Other state backends serialize their data such that the > >> behavior > >>> is > >>>>> not > >>>>>>>>> consistent across backends. > >>>>>>>>> If we want to have light-weight tests, we have to create new > >>>> objects > >>>>> in > >>>>>>>>> the > >>>>>>>>> ReduceFunction causing unnecessary overhead. > >>>>>>>>> > >>>>>>>>> I would propose to copy objects when storing them in a > >>>>>> HeapStateBackend. > >>>>>>>>> This would ensure that objects returned from state to the user > >>>> behave > >>>>>>>>> identical for different state backends. > >>>>>>>>> > >>>>>>>>> We created a related JIRA [2] that asks to copy records that go > >>>> into > >>>>> an > >>>>>>>>> incremental ReduceFunction. The scope is more narrow and would > >>>> solve > >>>>>> our > >>>>>>>>> problem, but would leave the inconsistent behavior of state > >>>> backends > >>>>> in > >>>>>>>>> place. > >>>>>>>>> > >>>>>>>>> What do others think? > >>>>>>>>> > >>>>>>>>> Cheers, Fabian > >>>>>>>>> > >>>>>>>>> [1] > >>> https://github.com/apache/flink/pull/2792#discussion_r88653721 > >>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105 > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >