I think it's not a good idea to introduce this special case fix for one
specific use case because this can have implications for other parts of the
code. We should push for keeping the data in serialised form. There's also
other problems, for example, a ListState allows modifying the returned
Iterable and this will change the actual contents of the state in the
HeapStateBackend while in the RocksDB backend this does not change the
state.

On Wed, 23 Nov 2016 at 10:11 Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> Why do you think that this means "much code changes"?
> I think it would actually be a pretty lightweight change in
> HeapReducingState.
>
> The proposal is to copy the *first* value that goes into a ReducingState.
> The copy would be done by a TypeSerializer and hence be a deep copy.
> This will allow to reuse the copy in each invocation of the ReduceFunction
> instead of creating a new result object of the same type that was initially
> copied.
>
> I think the savings of reusing the object in each invocation of the
> ReduceFunction and not creating a new object should amortize the one-time
> object copy.
>
> Fabian
>
> 2016-11-23 3:04 GMT+01:00 sjk <shijinkui...@163.com>:
>
> > Hi, Fabian
> >
> > So much code changes. Can you show us the key changes code for the object
> > copy?
> > Object reference maybe hold more deep reference, it can be a bomb.
> > Can we renew a object with its data or direct use kryo for object
> > serialization?
> > I’m not prefer object copy.
> >
> >
> > > On Nov 22, 2016, at 20:33, Fabian Hueske <fhue...@gmail.com> wrote:
> > >
> > > Does anybody have objections against copying the first record that goes
> > > into the ReduceState?
> > >
> > > 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> > >
> > >> That's right, yes.
> > >>
> > >> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fhue...@gmail.com> wrote:
> > >>
> > >>> Right, but that would be a much bigger change than "just" copying the
> > >>> *first* record that goes into the ReduceState, or am I missing
> > something?
> > >>>
> > >>>
> > >>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> > >>>
> > >>>> To bring over my comment from the Github PR that started this
> > >> discussion:
> > >>>>
> > >>>> @wuchong <https://github.com/wuchong>, yes this is a problem with
> the
> > >>>> HeapStateBackend. The RocksDB backend does not suffer from this
> > >> problem.
> > >>> I
> > >>>> think in the long run we should migrate the HeapStateBackend to
> always
> > >>> keep
> > >>>> data in serialised form, then we also won't have this problem
> anymore.
> > >>>>
> > >>>> So I'm very much in favour of keeping data serialised. Copying data
> > >> would
> > >>>> only ever be a stopgap solution.
> > >>>>
> > >>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fhue...@gmail.com>
> wrote:
> > >>>>
> > >>>>> Another approach that would solve the problem for our use case
> > >> (object
> > >>>>> re-usage for incremental window ReduceFunctions) would be to copy
> the
> > >>>> first
> > >>>>> object that is put into the state.
> > >>>>> This would be a change on the ReduceState, not on the overall state
> > >>>>> backend, which should be feasible, no?
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> > >>>>>
> > >>>>>> -1 for copying objects.
> > >>>>>>
> > >>>>>> Storing a serialized data where possible is good, but copying all
> > >>>> objects
> > >>>>>> by default is not a good idea, in my opinion.
> > >>>>>> A lot of scenarios use data types that are hellishly expensive to
> > >>> copy.
> > >>>>>> Even the current copy on chain handover is a problem.
> > >>>>>>
> > >>>>>> Let's not introduce even more copies.
> > >>>>>>
> > >>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <m...@touk.pl>
> > >>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> it will come with performance overhead when updating the state,
> > >>> but I
> > >>>>>>> think it'll be possible to perform asynchronous snapshots using
> > >>>>>>> HeapStateBackend (probably some changes to underlying data
> > >>> structures
> > >>>>>> would
> > >>>>>>> be needed) - which would bring more predictable performance.
> > >>>>>>>
> > >>>>>>> thanks,
> > >>>>>>> maciek
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
> > >>>>>>>
> > >>>>>>>> Hi,
> > >>>>>>>> I would be in favour of this since it brings things in line with
> > >>> the
> > >>>>>>>> RocksDB backend. This will, however, come with quite the
> > >>> performance
> > >>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
> > >>>>>>>>
> > >>>>>>>> Cheers,
> > >>>>>>>> Aljoscha
> > >>>>>>>>
> > >>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com>
> > >>>> wrote:
> > >>>>>>>>
> > >>>>>>>> Hi everybody,
> > >>>>>>>>>
> > >>>>>>>>> when implementing a ReduceFunction for incremental aggregation
> > >> of
> > >>>>> SQL /
> > >>>>>>>>> Table API window aggregates we noticed that the
> > >> HeapStateBackend
> > >>>> does
> > >>>>>> not
> > >>>>>>>>> store copies but holds references to the original objects. In
> > >>> case
> > >>>>> of a
> > >>>>>>>>> SlidingWindow, the same object is referenced from different
> > >>> window
> > >>>>>> panes.
> > >>>>>>>>> Therefore, it is not possible to modify these objects (in order
> > >>> to
> > >>>>>> avoid
> > >>>>>>>>> object instantiations, see discussion [1]).
> > >>>>>>>>>
> > >>>>>>>>> Other state backends serialize their data such that the
> > >> behavior
> > >>> is
> > >>>>> not
> > >>>>>>>>> consistent across backends.
> > >>>>>>>>> If we want to have light-weight tests, we have to create new
> > >>>> objects
> > >>>>> in
> > >>>>>>>>> the
> > >>>>>>>>> ReduceFunction causing unnecessary overhead.
> > >>>>>>>>>
> > >>>>>>>>> I would propose to copy objects when storing them in a
> > >>>>>> HeapStateBackend.
> > >>>>>>>>> This would ensure that objects returned from state to the user
> > >>>> behave
> > >>>>>>>>> identical for different state backends.
> > >>>>>>>>>
> > >>>>>>>>> We created a related JIRA [2] that asks to copy records that go
> > >>>> into
> > >>>>> an
> > >>>>>>>>> incremental ReduceFunction. The scope is more narrow and would
> > >>>> solve
> > >>>>>> our
> > >>>>>>>>> problem, but would leave the inconsistent behavior of state
> > >>>> backends
> > >>>>> in
> > >>>>>>>>> place.
> > >>>>>>>>>
> > >>>>>>>>> What do others think?
> > >>>>>>>>>
> > >>>>>>>>> Cheers, Fabian
> > >>>>>>>>>
> > >>>>>>>>> [1]
> > >>> https://github.com/apache/flink/pull/2792#discussion_r88653721
> > >>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
> >
>

Reply via email to