Hi, I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the value on update. As for "value()", it may (de)serialize it and return a copy if there is an ongoing async snapshot in progress (to protect from modifications). This shouldn't happen often though.
Regards, Roman On Mon, Feb 8, 2021 at 3:24 AM Yun Tang <myas...@live.com> wrote: > Hi, > > MemoryStateBackend and FsStateBackend both hold keyed state in > HeapKeyedStateBackend [1], and the main structure to store data is > StateTable [2] which holds POJO format objects. That is to say, the object > would not be serialized when calling update(). > On the other hand, RocksDB statebackend would store value with serialized > bytes. > > > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java > [2] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java > > Best > Yun Tang > > ------------------------------ > *From:* Colletta, Edward <edward.colle...@fmr.com> > *Sent:* Sunday, February 7, 2021 19:53 > *To:* user@flink.apache.org <user@flink.apache.org> > *Subject:* question on ValueState > > > Using FsStateBackend. > > > > I was under the impression that ValueState.value will serialize an object > which is stored in the local state backend, copy the serialized object and > deserializes it. Likewise update() would do the same steps copying the > object back to local state backend. And as a consequence, storing > collections in ValueState is much less efficient than using ListState or > MapState if possible. > > > > However, I am looking at some code I wrote a while ago which made the > assumption that the value() method just returned a reference to the > object. The code only calls update() when creating the object if value() > returns null. Yet the code works, all changes to the object stored in > state are visible the next time value() is called. I have some sample > code below. > > > > Can someone clarify what really happens when value() is called? > > > > > > public void processElement(M in, Context ctx, Collector<Long> out) > throws Exception { > > MyWindow myWindow; > > myWindow = windowState.value(); > > if (myWindow == null) { > > > ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() > + interval) / interval) * interval); > > myWindow = new MyWindow(0L, slide, windowSize); > > windowState.update(myWindow); > > myWindow.eq.add(0L); > > } > > > myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() > + in.value); > > } > > > > @Override > > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<Long> out) throws Exception { > > > ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() > + interval) / interval) * interval); > > MyWindow myWindow = windowState.value(); > > myWindow.slide(0L); > > out.collect(myWindow.globalAccum); > > } > > > > >