Right, in this case FileSystemStateBackend is the right choice. The state size is limited by TM memory as you said.
Regards, Roman On Tue, Feb 9, 2021 at 8:54 AM yidan zhao <hinobl...@gmail.com> wrote: > What I am interested in is whether I should use rocksDB to replace > fileBackend. > RocksDB's performance is not good, while it's state size can be very large. > Currently, my job's state is about 10GB, and I use 10 TaskManagers in > different machines, each 100G memory. I do not think I should use rocksDB, > is it right? > > yidan zhao <hinobl...@gmail.com> 于2021年2月9日周二 下午3:50写道: > >> I have a related question. >> Since fileStateBackend uses heap as the state storage and the checkpoint >> is finally stored in the filesystem, so whether the JobManager/TaskManager >> memory will limit the state size? The state size is limited by TM's memory >> * number of TMs? or limited by JM's memory. >> >> >> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2021年2月8日周一 下午6:05写道: >> >>> Hi, >>> >>> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the >>> value on update. >>> As for "value()", it may (de)serialize it and return a copy if there is >>> an ongoing async snapshot in progress (to protect from modifications). This >>> shouldn't happen often though. >>> >>> Regards, >>> Roman >>> >>> >>> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang <myas...@live.com> wrote: >>> >>>> Hi, >>>> >>>> MemoryStateBackend and FsStateBackend both hold keyed state in >>>> HeapKeyedStateBackend [1], and the main structure to store data is >>>> StateTable [2] which holds POJO format objects. That is to say, the object >>>> would not be serialized when calling update(). >>>> On the other hand, RocksDB statebackend would store value with >>>> serialized bytes. >>>> >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java >>>> [2] >>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java >>>> >>>> Best >>>> Yun Tang >>>> >>>> ------------------------------ >>>> *From:* Colletta, Edward <edward.colle...@fmr.com> >>>> *Sent:* Sunday, February 7, 2021 19:53 >>>> *To:* user@flink.apache.org <user@flink.apache.org> >>>> *Subject:* question on ValueState >>>> >>>> >>>> Using FsStateBackend. >>>> >>>> >>>> >>>> I was under the impression that ValueState.value will serialize an >>>> object which is stored in the local state backend, copy the serialized >>>> object and deserializes it. Likewise update() would do the same steps >>>> copying the object back to local state backend. And as a consequence, >>>> storing collections in ValueState is much less efficient than using >>>> ListState or MapState if possible. >>>> >>>> >>>> >>>> However, I am looking at some code I wrote a while ago which made the >>>> assumption that the value() method just returned a reference to the >>>> object. The code only calls update() when creating the object if value() >>>> returns null. Yet the code works, all changes to the object stored in >>>> state are visible the next time value() is called. I have some sample >>>> code below. >>>> >>>> >>>> >>>> Can someone clarify what really happens when value() is called? >>>> >>>> >>>> >>>> >>>> >>>> public void processElement(M in, Context ctx, Collector<Long> out) >>>> throws Exception { >>>> >>>> MyWindow myWindow; >>>> >>>> myWindow = windowState.value(); >>>> >>>> if (myWindow == null) { >>>> >>>> >>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() >>>> + interval) / interval) * interval); >>>> >>>> myWindow = new MyWindow(0L, slide, windowSize); >>>> >>>> windowState.update(myWindow); >>>> >>>> myWindow.eq.add(0L); >>>> >>>> } >>>> >>>> >>>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator() >>>> + in.value); >>>> >>>> } >>>> >>>> >>>> >>>> @Override >>>> >>>> public void onTimer(long timestamp, OnTimerContext ctx, >>>> Collector<Long> out) throws Exception { >>>> >>>> >>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() >>>> + interval) / interval) * interval); >>>> >>>> MyWindow myWindow = windowState.value(); >>>> >>>> myWindow.slide(0L); >>>> >>>> out.collect(myWindow.globalAccum); >>>> >>>> } >>>> >>>> >>>> >>>> >>>> >>>