Hi, Fabian So much code changes. Can you show us the key changes code for the object copy? Object reference maybe hold more deep reference, it can be a bomb. Can we renew a object with its data or direct use kryo for object serialization? I’m not prefer object copy.
> On Nov 22, 2016, at 20:33, Fabian Hueske <fhue...@gmail.com> wrote: > > Does anybody have objections against copying the first record that goes > into the ReduceState? > > 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: > >> That's right, yes. >> >> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Right, but that would be a much bigger change than "just" copying the >>> *first* record that goes into the ReduceState, or am I missing something? >>> >>> >>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>: >>> >>>> To bring over my comment from the Github PR that started this >> discussion: >>>> >>>> @wuchong <https://github.com/wuchong>, yes this is a problem with the >>>> HeapStateBackend. The RocksDB backend does not suffer from this >> problem. >>> I >>>> think in the long run we should migrate the HeapStateBackend to always >>> keep >>>> data in serialised form, then we also won't have this problem anymore. >>>> >>>> So I'm very much in favour of keeping data serialised. Copying data >> would >>>> only ever be a stopgap solution. >>>> >>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fhue...@gmail.com> wrote: >>>> >>>>> Another approach that would solve the problem for our use case >> (object >>>>> re-usage for incremental window ReduceFunctions) would be to copy the >>>> first >>>>> object that is put into the state. >>>>> This would be a change on the ReduceState, not on the overall state >>>>> backend, which should be feasible, no? >>>>> >>>>> >>>>> >>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>: >>>>> >>>>>> -1 for copying objects. >>>>>> >>>>>> Storing a serialized data where possible is good, but copying all >>>> objects >>>>>> by default is not a good idea, in my opinion. >>>>>> A lot of scenarios use data types that are hellishly expensive to >>> copy. >>>>>> Even the current copy on chain handover is a problem. >>>>>> >>>>>> Let's not introduce even more copies. >>>>>> >>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <m...@touk.pl> >>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> it will come with performance overhead when updating the state, >>> but I >>>>>>> think it'll be possible to perform asynchronous snapshots using >>>>>>> HeapStateBackend (probably some changes to underlying data >>> structures >>>>>> would >>>>>>> be needed) - which would bring more predictable performance. >>>>>>> >>>>>>> thanks, >>>>>>> maciek >>>>>>> >>>>>>> >>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> I would be in favour of this since it brings things in line with >>> the >>>>>>>> RocksDB backend. This will, however, come with quite the >>> performance >>>>>>>> overhead, depending on how fast the TypeSerializer can copy. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>>>>>> >>>>>>>> Hi everybody, >>>>>>>>> >>>>>>>>> when implementing a ReduceFunction for incremental aggregation >> of >>>>> SQL / >>>>>>>>> Table API window aggregates we noticed that the >> HeapStateBackend >>>> does >>>>>> not >>>>>>>>> store copies but holds references to the original objects. In >>> case >>>>> of a >>>>>>>>> SlidingWindow, the same object is referenced from different >>> window >>>>>> panes. >>>>>>>>> Therefore, it is not possible to modify these objects (in order >>> to >>>>>> avoid >>>>>>>>> object instantiations, see discussion [1]). >>>>>>>>> >>>>>>>>> Other state backends serialize their data such that the >> behavior >>> is >>>>> not >>>>>>>>> consistent across backends. >>>>>>>>> If we want to have light-weight tests, we have to create new >>>> objects >>>>> in >>>>>>>>> the >>>>>>>>> ReduceFunction causing unnecessary overhead. >>>>>>>>> >>>>>>>>> I would propose to copy objects when storing them in a >>>>>> HeapStateBackend. >>>>>>>>> This would ensure that objects returned from state to the user >>>> behave >>>>>>>>> identical for different state backends. >>>>>>>>> >>>>>>>>> We created a related JIRA [2] that asks to copy records that go >>>> into >>>>> an >>>>>>>>> incremental ReduceFunction. The scope is more narrow and would >>>> solve >>>>>> our >>>>>>>>> problem, but would leave the inconsistent behavior of state >>>> backends >>>>> in >>>>>>>>> place. >>>>>>>>> >>>>>>>>> What do others think? >>>>>>>>> >>>>>>>>> Cheers, Fabian >>>>>>>>> >>>>>>>>> [1] >>> https://github.com/apache/flink/pull/2792#discussion_r88653721 >>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105 >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>