Hi everybody,

when implementing a ReduceFunction for incremental aggregation of SQL /
Table API window aggregates we noticed that the HeapStateBackend does not
store copies but holds references to the original objects. In case of a
SlidingWindow, the same object is referenced from different window panes.
Therefore, it is not possible to modify these objects (in order to avoid
object instantiations, see discussion [1]).

Other state backends serialize their data such that the behavior is not
consistent across backends.
If we want to have light-weight tests, we have to create new objects in the
ReduceFunction causing unnecessary overhead.

I would propose to copy objects when storing them in a HeapStateBackend.
This would ensure that objects returned from state to the user behave
identical for different state backends.

We created a related JIRA [2] that asks to copy records that go into an
incremental ReduceFunction. The scope is more narrow and would solve our
problem, but would leave the inconsistent behavior of state backends in
place.

What do others think?

Cheers, Fabian

[1] https://github.com/apache/flink/pull/2792#discussion_r88653721
[2] https://issues.apache.org/jira/browse/FLINK-5105

Reply via email to