Right, in this case FileSystemStateBackend is the right choice.
The state size is limited by TM memory as you said.

Regards,
Roman


On Tue, Feb 9, 2021 at 8:54 AM yidan zhao <hinobl...@gmail.com> wrote:

> What I am interested in is whether I should use rocksDB to replace
> fileBackend.
> RocksDB's performance is not good, while it's state size can be very large.
> Currently, my job's state is about 10GB, and I use 10 TaskManagers in
> different machines, each 100G memory. I do not think I should use rocksDB,
> is it right?
>
> yidan zhao <hinobl...@gmail.com> 于2021年2月9日周二 下午3:50写道:
>
>> I have a related question.
>> Since fileStateBackend uses heap as the state storage and the checkpoint
>> is finally stored in the filesystem, so whether the JobManager/TaskManager
>> memory will limit the state size? The state size is limited by TM's memory
>> * number of TMs? or limited by JM's memory.
>>
>>
>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2021年2月8日周一 下午6:05写道:
>>
>>> Hi,
>>>
>>> I think Yun Tang is right, HeapStateBackend doesn't (de)serialize the
>>> value on update.
>>> As for "value()", it may (de)serialize it and return a copy if there is
>>> an ongoing async snapshot in progress (to protect from modifications). This
>>> shouldn't happen often though.
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Feb 8, 2021 at 3:24 AM Yun Tang <myas...@live.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> MemoryStateBackend and FsStateBackend both hold keyed state in
>>>> HeapKeyedStateBackend [1], and the main structure to store data is
>>>> StateTable [2] which holds POJO format objects. That is to say, the object
>>>> would not be serialized when calling update().
>>>> On the other hand, RocksDB statebackend would store value with
>>>> serialized bytes.
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
>>>> [2]
>>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
>>>>
>>>> Best
>>>> Yun Tang
>>>>
>>>> ------------------------------
>>>> *From:* Colletta, Edward <edward.colle...@fmr.com>
>>>> *Sent:* Sunday, February 7, 2021 19:53
>>>> *To:* user@flink.apache.org <user@flink.apache.org>
>>>> *Subject:* question on ValueState
>>>>
>>>>
>>>> Using FsStateBackend.
>>>>
>>>>
>>>>
>>>> I was under the impression that ValueState.value will serialize an
>>>> object which is stored in the local state backend, copy the serialized
>>>> object and deserializes it.  Likewise update() would do the same steps
>>>> copying the object back to local state backend.    And as a consequence,
>>>> storing collections in ValueState is much less efficient than using
>>>> ListState or MapState if possible.
>>>>
>>>>
>>>>
>>>> However, I am looking at some code I wrote a while ago which made the
>>>> assumption that the value() method just returned a reference to the
>>>> object.  The code only calls update() when creating the object if value()
>>>> returns null.    Yet the code works, all changes to the object stored in
>>>> state are visible the next time value() is called.   I have some sample
>>>> code below.
>>>>
>>>>
>>>>
>>>> Can someone clarify what really happens when value() is called?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>    public void processElement(M in, Context ctx, Collector<Long> out)
>>>> throws Exception {
>>>>
>>>>         MyWindow myWindow;
>>>>
>>>>         myWindow = windowState.value();
>>>>
>>>>         if (myWindow == null) {
>>>>
>>>>
>>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>>> + interval) / interval) * interval);
>>>>
>>>>             myWindow = new MyWindow(0L, slide, windowSize);
>>>>
>>>>             windowState.update(myWindow);
>>>>
>>>>             myWindow.eq.add(0L);
>>>>
>>>>         }
>>>>
>>>>
>>>> myWindow.eq.getTail().setAccumulator(myWindow.eq.getTail().getAccumulator()
>>>> + in.value);
>>>>
>>>>     }
>>>>
>>>>
>>>>
>>>>     @Override
>>>>
>>>>     public void onTimer(long timestamp, OnTimerContext ctx,
>>>> Collector<Long> out) throws Exception {
>>>>
>>>>
>>>> ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime()
>>>> + interval) / interval) * interval);
>>>>
>>>>         MyWindow myWindow = windowState.value();
>>>>
>>>>         myWindow.slide(0L);
>>>>
>>>>         out.collect(myWindow.globalAccum);
>>>>
>>>>     }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>

Reply via email to