Hi,

MemoryStateBackend and FsStateBackend both hold keyed state in 
HeapKeyedStateBackend [1], and the main structure to store data is StateTable 
[2] which holds POJO format objects. That is to say, the object would not be 
serialized when calling update().
On the other hand, RocksDB statebackend would store value with serialized bytes.


[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java

Best
Yun Tang

________________________________
From: Colletta, Edward <edward.colle...@fmr.com>
Sent: Sunday, February 7, 2021 19:53
To: user@flink.apache.org <user@flink.apache.org>
Subject: question on ValueState


Using FsStateBackend.



I was under the impression that ValueState.value will serialize an object which 
is stored in the local state backend, copy the serialized object and 
deserializes it.  Likewise update() would do the same steps copying the object 
back to local state backend.    And as a consequence, storing collections in 
ValueState is much less efficient than using ListState or MapState if possible.



However, I am looking at some code I wrote a while ago which made the 
assumption that the value() method just returned a reference to the object.  
The code only calls update() when creating the object if value() returns null.  
  Yet the code works, all changes to the object stored in state are visible the 
next time value() is called.   I have some sample code below.



Can someone clarify what really happens when value() is called?





   public void processElement(M in, Context ctx, Collector<Long> out) throws 
Exception {

        MyWindow myWindow;

        myWindow = windowState.value();

        if (myWindow == null) {

            
ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);

            myWindow = new MyWindow(0L, slide, windowSize);

            windowState.update(myWindow);

            myWindow.eq.add(0L);

        }

        
myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() + 
in.value);

    }



    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Long> 
out) throws Exception {

        
ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
 + interval) / interval) * interval);

        MyWindow myWindow = windowState.value();

        myWindow.slide(0L);

        out.collect(myWindow.globalAccum);

    }




Reply via email to