Hi Gabor,

I thought so. I was hoping for a way to read the savepoint in pages,
instead of as a single blob up front which I think is what the hashmap
does... we just want to be called for each entry and extract the bit we
want in that scenario.

Never mind

Thank you for the insight. Saves me a lot of hunting for nothing.

JM

On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Jean-Marc,
>
> We've already realized that the RocksDB approach is not reaching the
> performance criteria which it should be. There is an open issue for it [1].
> The hashmap based approach was and is always expecting more memory. So if
> the memory footprint is a hard requirement then RocksDB is the only way now.
>
> Bad to say but I can't suggest any nifty trick to make it better. All I
> can promise that I'm now measuring performance of the RocksDB approach
> and intended to eliminate the slowness. Since we don't know what exactly
> causes the slowness the new Frocksdb-8.10.0 can be also an imrpvement.
>
> All in all it will take some time to sort this out.
>
> [1] https://issues.apache.org/jira/browse/FLINK-37109
>
> BR,
> G
>
>
> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <jm.pau...@gmail.com>
> wrote:
>
>> What would be the best approach to read a savepoint and minimise the
>> memory consumption. We just need to transform it into something else for
>> investigation.
>>
>> Our flink 1.20 streaming job is using HashMap backend, and is spread over
>> 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A savepoint
>> can be 4-5Gb or more.
>>
>> The reader is more basic, using a Local Execution EnvironmentThis is
>> essentially what we are doing:
>>
>>     StreamExecutionEnvironment env =
>> LocalStreamEnvironment.getExecutionEnvironment();
>>     env.setParallelism(1);
>>
>>     SavepointReader savepoint =
>>         SavepointReader.read(env, savepointLocation, new
>> HashMapStateBackend());
>>     // SavepointReader.read(env, savepointLocation, new
>> EmbeddedRocksDBStateBackend()); // Too slow
>>
>>     DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>> mainOperatorState =
>>         savepoint.readKeyedState(
>>             MAIN_OPERATOR,
>>             new StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>
>>
>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>> stateReader = mainOperatorState.executeAndCollect()
>>     stateReader.forEachRemaining( record -> { ...
>>         /// extract what we need here
>>     }
>>
>>
>> We tried two approaches:
>> - One is to read the savepoint with a rockDb backend. That works and is
>> low on memory usage, but is very very slow. We noticed the iterator is
>> available very early on, but it is slow...
>> - The other is to read the savepoint with a HashMap backend. That is
>> very fast, as expected. However the iterator apparently only returns once
>> the whole savepoint has been loaded in the HashMap, so heavy memory
>> consumption.
>>
>> Is there a better way to do that? or a way to tune it so that it does not
>> consume all the memory ? or maybe reading it in parts...
>>
>> Thanks
>>
>> JM
>>
>

Reply via email to