I have a related question. Since fileStateBackend uses heap as the state storage and the checkpoint is finally stored in the filesystem, so whether the JobManager/TaskManager memory will limit the state size? The state size is limited by TM's memory * number of TMs? or limited by JM's memory.
Khachatryan Roman <khachatryan.ro...@gmail.com> 于2021年2月8日周一 下午6:05写道: > Hi, > > I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the > value on update. > As for "value()", it may (de)serialize it and return a copy if there is an > ongoing async snapshot in progress (to protect from modifications). This > shouldn't happen often though. > > Regards, > Roman > > > On Mon, Feb 8, 2021 at 3:24 AM Yun Tang <myas...@live.com> wrote: > >> Hi, >> >> MemoryStateBackend and FsStateBackend both hold keyed state in >> HeapKeyedStateBackend [1], and the main structure to store data is >> StateTable [2] which holds POJO format objects. That is to say, the object >> would not be serialized when calling update(). >> On the other hand, RocksDB statebackend would store value with serialized >> bytes. >> >> >> [1] >> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java >> [2] >> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java >> >> Best >> Yun Tang >> >> ------------------------------ >> *From:* Colletta, Edward <edward.colle...@fmr.com> >> *Sent:* Sunday, February 7, 2021 19:53 >> *To:* user@flink.apache.org <user@flink.apache.org> >> *Subject:* question on ValueState >> >> >> Using FsStateBackend. >> >> >> >> I was under the impression that ValueState.value will serialize an object >> which is stored in the local state backend, copy the serialized object and >> deserializes it. Likewise update() would do the same steps copying the >> object back to local state backend. And as a consequence, storing >> collections in ValueState is much less efficient than using ListState or >> MapState if possible. >> >> >> >> However, I am looking at some code I wrote a while ago which made the >> assumption that the value() method just returned a reference to the >> object. The code only calls update() when creating the object if value() >> returns null. Yet the code works, all changes to the object stored in >> state are visible the next time value() is called. I have some sample >> code below. >> >> >> >> Can someone clarify what really happens when value() is called? >> >> >> >> >> >> public void processElement(M in, Context ctx, Collector<Long> out) >> throws Exception { >> >> MyWindow myWindow; >> >> myWindow = windowState.value(); >> >> if (myWindow == null) { >> >> >> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() >> + interval) / interval) * interval); >> >> myWindow = new MyWindow(0L, slide, windowSize); >> >> windowState.update(myWindow); >> >> myWindow.eq.add(0L); >> >> } >> >> >> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() >> + in.value); >> >> } >> >> >> >> @Override >> >> public void onTimer(long timestamp, OnTimerContext ctx, >> Collector<Long> out) throws Exception { >> >> >> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() >> + interval) / interval) * interval); >> >> MyWindow myWindow = windowState.value(); >> >> myWindow.slide(0L); >> >> out.collect(myWindow.globalAccum); >> >> } >> >> >> >> >> >