Hi,

Why do you think that this means "much code changes"?
I think it would actually be a pretty lightweight change in
HeapReducingState.

The proposal is to copy the *first* value that goes into a ReducingState.
The copy would be done by a TypeSerializer and hence be a deep copy.
This will allow to reuse the copy in each invocation of the ReduceFunction
instead of creating a new result object of the same type that was initially
copied.

I think the savings of reusing the object in each invocation of the
ReduceFunction and not creating a new object should amortize the one-time
object copy.

Fabian

2016-11-23 3:04 GMT+01:00 sjk <shijinkui...@163.com>:

> Hi, Fabian
>
> So much code changes. Can you show us the key changes code for the object
> copy?
> Object reference maybe hold more deep reference, it can be a bomb.
> Can we renew a object with its data or direct use kryo for object
> serialization?
> I’m not prefer object copy.
>
>
> > On Nov 22, 2016, at 20:33, Fabian Hueske <fhue...@gmail.com> wrote:
> >
> > Does anybody have objections against copying the first record that goes
> > into the ReduceState?
> >
> > 2016-11-22 12:49 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> >
> >> That's right, yes.
> >>
> >> On Mon, 21 Nov 2016 at 19:14 Fabian Hueske <fhue...@gmail.com> wrote:
> >>
> >>> Right, but that would be a much bigger change than "just" copying the
> >>> *first* record that goes into the ReduceState, or am I missing
> something?
> >>>
> >>>
> >>> 2016-11-21 18:41 GMT+01:00 Aljoscha Krettek <aljos...@apache.org>:
> >>>
> >>>> To bring over my comment from the Github PR that started this
> >> discussion:
> >>>>
> >>>> @wuchong <https://github.com/wuchong>, yes this is a problem with the
> >>>> HeapStateBackend. The RocksDB backend does not suffer from this
> >> problem.
> >>> I
> >>>> think in the long run we should migrate the HeapStateBackend to always
> >>> keep
> >>>> data in serialised form, then we also won't have this problem anymore.
> >>>>
> >>>> So I'm very much in favour of keeping data serialised. Copying data
> >> would
> >>>> only ever be a stopgap solution.
> >>>>
> >>>> On Mon, 21 Nov 2016 at 15:56 Fabian Hueske <fhue...@gmail.com> wrote:
> >>>>
> >>>>> Another approach that would solve the problem for our use case
> >> (object
> >>>>> re-usage for incremental window ReduceFunctions) would be to copy the
> >>>> first
> >>>>> object that is put into the state.
> >>>>> This would be a change on the ReduceState, not on the overall state
> >>>>> backend, which should be feasible, no?
> >>>>>
> >>>>>
> >>>>>
> >>>>> 2016-11-21 15:43 GMT+01:00 Stephan Ewen <se...@apache.org>:
> >>>>>
> >>>>>> -1 for copying objects.
> >>>>>>
> >>>>>> Storing a serialized data where possible is good, but copying all
> >>>> objects
> >>>>>> by default is not a good idea, in my opinion.
> >>>>>> A lot of scenarios use data types that are hellishly expensive to
> >>> copy.
> >>>>>> Even the current copy on chain handover is a problem.
> >>>>>>
> >>>>>> Let's not introduce even more copies.
> >>>>>>
> >>>>>> On Mon, Nov 21, 2016 at 3:16 PM, Maciek Próchniak <m...@touk.pl>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> it will come with performance overhead when updating the state,
> >>> but I
> >>>>>>> think it'll be possible to perform asynchronous snapshots using
> >>>>>>> HeapStateBackend (probably some changes to underlying data
> >>> structures
> >>>>>> would
> >>>>>>> be needed) - which would bring more predictable performance.
> >>>>>>>
> >>>>>>> thanks,
> >>>>>>> maciek
> >>>>>>>
> >>>>>>>
> >>>>>>> On 21/11/2016 13:48, Aljoscha Krettek wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>> I would be in favour of this since it brings things in line with
> >>> the
> >>>>>>>> RocksDB backend. This will, however, come with quite the
> >>> performance
> >>>>>>>> overhead, depending on how fast the TypeSerializer can copy.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>> On Mon, 21 Nov 2016 at 11:30 Fabian Hueske <fhue...@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi everybody,
> >>>>>>>>>
> >>>>>>>>> when implementing a ReduceFunction for incremental aggregation
> >> of
> >>>>> SQL /
> >>>>>>>>> Table API window aggregates we noticed that the
> >> HeapStateBackend
> >>>> does
> >>>>>> not
> >>>>>>>>> store copies but holds references to the original objects. In
> >>> case
> >>>>> of a
> >>>>>>>>> SlidingWindow, the same object is referenced from different
> >>> window
> >>>>>> panes.
> >>>>>>>>> Therefore, it is not possible to modify these objects (in order
> >>> to
> >>>>>> avoid
> >>>>>>>>> object instantiations, see discussion [1]).
> >>>>>>>>>
> >>>>>>>>> Other state backends serialize their data such that the
> >> behavior
> >>> is
> >>>>> not
> >>>>>>>>> consistent across backends.
> >>>>>>>>> If we want to have light-weight tests, we have to create new
> >>>> objects
> >>>>> in
> >>>>>>>>> the
> >>>>>>>>> ReduceFunction causing unnecessary overhead.
> >>>>>>>>>
> >>>>>>>>> I would propose to copy objects when storing them in a
> >>>>>> HeapStateBackend.
> >>>>>>>>> This would ensure that objects returned from state to the user
> >>>> behave
> >>>>>>>>> identical for different state backends.
> >>>>>>>>>
> >>>>>>>>> We created a related JIRA [2] that asks to copy records that go
> >>>> into
> >>>>> an
> >>>>>>>>> incremental ReduceFunction. The scope is more narrow and would
> >>>> solve
> >>>>>> our
> >>>>>>>>> problem, but would leave the inconsistent behavior of state
> >>>> backends
> >>>>> in
> >>>>>>>>> place.
> >>>>>>>>>
> >>>>>>>>> What do others think?
> >>>>>>>>>
> >>>>>>>>> Cheers, Fabian
> >>>>>>>>>
> >>>>>>>>> [1]
> >>> https://github.com/apache/flink/pull/2792#discussion_r88653721
> >>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-5105
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>
>

Reply via email to