Hi Aljoscha, sure, there many issues with holding the state as objects on the heap. However, I think we don't have to solve all problems related to that in order to add a small fix that solves one specific issue. I would not explicitly expose the fix to users but it would be nice if we could implement more efficient code for internal functions.
Moreover, I think we should extend the documentation and clearly point out the limitations regarding modifying state objects. Best, Fabian 2016-11-23 12:07 GMT+01:00 sjk <shijinkui...@163.com>: > hi,Fabian Hueske, Sorry for mistake for the whole PR #2792 > > > On Nov 23, 2016, at 17:10, Fabian Hueske <fhue...@gmail.com> wrote: > > > > Hi, > > > > Why do you think that this means "much code changes"? > > I think it would actually be a pretty lightweight change in > > HeapReducingState. > > > > The proposal is to copy the *first* value that goes into a ReducingState. > > The copy would be done by a TypeSerializer and hence be a deep copy. > > This will allow to reuse the copy in each invocation of the > ReduceFunction > > instead of creating a new result object of the same type that was > initially > > copied. > > > > I think the savings of reusing the object in each invocation of the > > ReduceFunction and not creating a new object should amortize the one-time > > object copy. > > > > Fabian > > > > 2016-11-23 3:04 GMT+01:00 sjk <shijinkui...@163.com>: > > > >> Hi, Fabian > >> > >> So much code changes. Can you show us the key changes code for the > object > >> copy? > >> Object reference maybe hold more deep reference, it can be a bomb. > >> Can we renew a object with its data or direct use kryo for object > >> serialization? > >> I’m not prefer object copy. > >> > >> > >>> On Nov 22, 2016, at 20:33, Fabian Hueske <fhue...@gmail.com> wrote: > >>> > >>> Does anybody have objections against copying the first record that goes > >>> into the ReduceState? > >>> > >>> 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > >>> > >>>> That's right, yes. > >>>> > >>>> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fhue...@gmail.com> wrote: > >>>> > >>>>> Right, but that would be a much bigger change than "just" copying the > >>>>> *first* record that goes into the ReduceState, or am I missing > >> something? > >>>>> > >>>>> > >>>>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > >>>>> > >>>>>> To bring over my comment from the Github PR that started this > >>>> discussion: > >>>>>> > >>>>>> @wuchong <https://github.com/wuchong>, yes this is a problem with > the > >>>>>> HeapStateBackend. The RocksDB backend does not suffer from this > >>>> problem. > >>>>> I > >>>>>> think in the long run we should migrate the HeapStateBackend to > always > >>>>> keep > >>>>>> data in serialised form, then we also won't have this problem > anymore. > >>>>>> > >>>>>> So I'm very much in favour of keeping data serialised. Copying data > >>>> would > >>>>>> only ever be a stopgap solution. > >>>>>> > >>>>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fhue...@gmail.com> > wrote: > >>>>>> > >>>>>>> Another approach that would solve the problem for our use case > >>>> (object > >>>>>>> re-usage for incremental window ReduceFunctions) would be to copy > the > >>>>>> first > >>>>>>> object that is put into the state. > >>>>>>> This would be a change on the ReduceState, not on the overall state > >>>>>>> backend, which should be feasible, no? > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>: > >>>>>>> > >>>>>>>> -1 for copying objects. > >>>>>>>> > >>>>>>>> Storing a serialized data where possible is good, but copying all > >>>>>> objects > >>>>>>>> by default is not a good idea, in my opinion. > >>>>>>>> A lot of scenarios use data types that are hellishly expensive to > >>>>> copy. > >>>>>>>> Even the current copy on chain handover is a problem. > >>>>>>>> > >>>>>>>> Let's not introduce even more copies. > >>>>>>>> > >>>>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <m...@touk.pl> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> it will come with performance overhead when updating the state, > >>>>> but I > >>>>>>>>> think it'll be possible to perform asynchronous snapshots using > >>>>>>>>> HeapStateBackend (probably some changes to underlying data > >>>>> structures > >>>>>>>> would > >>>>>>>>> be needed) - which would bring more predictable performance. > >>>>>>>>> > >>>>>>>>> thanks, > >>>>>>>>> maciek > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> I would be in favour of this since it brings things in line with > >>>>> the > >>>>>>>>>> RocksDB backend. This will, however, come with quite the > >>>>> performance > >>>>>>>>>> overhead, depending on how fast the TypeSerializer can copy. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Aljoscha > >>>>>>>>>> > >>>>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com> > >>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi everybody, > >>>>>>>>>>> > >>>>>>>>>>> when implementing a ReduceFunction for incremental aggregation > >>>> of > >>>>>>> SQL / > >>>>>>>>>>> Table API window aggregates we noticed that the > >>>> HeapStateBackend > >>>>>> does > >>>>>>>> not > >>>>>>>>>>> store copies but holds references to the original objects. In > >>>>> case > >>>>>>> of a > >>>>>>>>>>> SlidingWindow, the same object is referenced from different > >>>>> window > >>>>>>>> panes. > >>>>>>>>>>> Therefore, it is not possible to modify these objects (in order > >>>>> to > >>>>>>>> avoid > >>>>>>>>>>> object instantiations, see discussion [1]). > >>>>>>>>>>> > >>>>>>>>>>> Other state backends serialize their data such that the > >>>> behavior > >>>>> is > >>>>>>> not > >>>>>>>>>>> consistent across backends. > >>>>>>>>>>> If we want to have light-weight tests, we have to create new > >>>>>> objects > >>>>>>> in > >>>>>>>>>>> the > >>>>>>>>>>> ReduceFunction causing unnecessary overhead. > >>>>>>>>>>> > >>>>>>>>>>> I would propose to copy objects when storing them in a > >>>>>>>> HeapStateBackend. > >>>>>>>>>>> This would ensure that objects returned from state to the user > >>>>>> behave > >>>>>>>>>>> identical for different state backends. > >>>>>>>>>>> > >>>>>>>>>>> We created a related JIRA [2] that asks to copy records that go > >>>>>> into > >>>>>>> an > >>>>>>>>>>> incremental ReduceFunction. The scope is more narrow and would > >>>>>> solve > >>>>>>>> our > >>>>>>>>>>> problem, but would leave the inconsistent behavior of state > >>>>>> backends > >>>>>>> in > >>>>>>>>>>> place. > >>>>>>>>>>> > >>>>>>>>>>> What do others think? > >>>>>>>>>>> > >>>>>>>>>>> Cheers, Fabian > >>>>>>>>>>> > >>>>>>>>>>> [1] > >>>>> https://github.com/apache/flink/pull/2792#discussion_r88653721 > >>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105 > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > >> > >> > > >