What I am interested in is whether I should use rocksDB to replace fileBackend. RocksDB's performance is not good, while it's state size can be very large. Currently, my job's state is about 10GB, and I use 10 TaskManagers in different machines, each 100G memory. I do not think I should use rocksDB, is it right?
yidan zhao <hinobl...@gmail.com> 于2021年2月9日周二 下午3:50写道: > I have a related question. > Since fileStateBackend uses heap as the state storage and the checkpoint > is finally stored in the filesystem, so whether the JobManager/TaskManager > memory will limit the state size? The state size is limited by TM's memory > * number of TMs? or limited by JM's memory. > > > Khachatryan Roman <khachatryan.ro...@gmail.com> 于2021年2月8日周一 下午6:05写道: > >> Hi, >> >> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the >> value on update. >> As for "value()", it may (de)serialize it and return a copy if there is >> an ongoing async snapshot in progress (to protect from modifications). This >> shouldn't happen often though. >> >> Regards, >> Roman >> >> >> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang <myas...@live.com> wrote: >> >>> Hi, >>> >>> MemoryStateBackend and FsStateBackend both hold keyed state in >>> HeapKeyedStateBackend [1], and the main structure to store data is >>> StateTable [2] which holds POJO format objects. That is to say, the object >>> would not be serialized when calling update(). >>> On the other hand, RocksDB statebackend would store value with >>> serialized bytes. >>> >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java >>> [2] >>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java >>> >>> Best >>> Yun Tang >>> >>> ------------------------------ >>> *From:* Colletta, Edward <edward.colle...@fmr.com> >>> *Sent:* Sunday, February 7, 2021 19:53 >>> *To:* user@flink.apache.org <user@flink.apache.org> >>> *Subject:* question on ValueState >>> >>> >>> Using FsStateBackend. >>> >>> >>> >>> I was under the impression that ValueState.value will serialize an >>> object which is stored in the local state backend, copy the serialized >>> object and deserializes it. Likewise update() would do the same steps >>> copying the object back to local state backend. And as a consequence, >>> storing collections in ValueState is much less efficient than using >>> ListState or MapState if possible. >>> >>> >>> >>> However, I am looking at some code I wrote a while ago which made the >>> assumption that the value() method just returned a reference to the >>> object. The code only calls update() when creating the object if value() >>> returns null. Yet the code works, all changes to the object stored in >>> state are visible the next time value() is called. I have some sample >>> code below. >>> >>> >>> >>> Can someone clarify what really happens when value() is called? >>> >>> >>> >>> >>> >>> public void processElement(M in, Context ctx, Collector<Long> out) >>> throws Exception { >>> >>> MyWindow myWindow; >>> >>> myWindow = windowState.value(); >>> >>> if (myWindow == null) { >>> >>> >>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() >>> + interval) / interval) * interval); >>> >>> myWindow = new MyWindow(0L, slide, windowSize); >>> >>> windowState.update(myWindow); >>> >>> myWindow.eq.add(0L); >>> >>> } >>> >>> >>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() >>> + in.value); >>> >>> } >>> >>> >>> >>> @Override >>> >>> public void onTimer(long timestamp, OnTimerContext ctx, >>> Collector<Long> out) throws Exception { >>> >>> >>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() >>> + interval) / interval) * interval); >>> >>> MyWindow myWindow = windowState.value(); >>> >>> myWindow.slide(0L); >>> >>> out.collect(myWindow.globalAccum); >>> >>> } >>> >>> >>> >>> >>> >>