What I could imagine is to create a normal Flink job,
use execution.state-recovery.path=/path/to/savepoint
set the operator UID on a custom written operator, which opens the state
info for you.

The only drawback is that you must know the keyBy range... this can be
problematic but if you can do it it's a win :)

G


On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:

> Hi Gabor,
>
> I thought so. I was hoping for a way to read the savepoint in pages,
> instead of as a single blob up front which I think is what the hashmap
> does... we just want to be called for each entry and extract the bit we
> want in that scenario.
>
> Never mind
>
> Thank you for the insight. Saves me a lot of hunting for nothing.
>
> JM
>
> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Hi Jean-Marc,
>>
>> We've already realized that the RocksDB approach is not reaching the
>> performance criteria which it should be. There is an open issue for it [1].
>> The hashmap based approach was and is always expecting more memory. So if
>> the memory footprint is a hard requirement then RocksDB is the only way now.
>>
>> Bad to say but I can't suggest any nifty trick to make it better. All I
>> can promise that I'm now measuring performance of the RocksDB approach
>> and intended to eliminate the slowness. Since we don't know what exactly
>> causes the slowness the new Frocksdb-8.10.0 can be also an imrpvement.
>>
>> All in all it will take some time to sort this out.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>
>> BR,
>> G
>>
>>
>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <jm.pau...@gmail.com>
>> wrote:
>>
>>> What would be the best approach to read a savepoint and minimise the
>>> memory consumption. We just need to transform it into something else for
>>> investigation.
>>>
>>> Our flink 1.20 streaming job is using HashMap backend, and is spread
>>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A
>>> savepoint can be 4-5Gb or more.
>>>
>>> The reader is more basic, using a Local Execution EnvironmentThis is
>>> essentially what we are doing:
>>>
>>>     StreamExecutionEnvironment env =
>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>     env.setParallelism(1);
>>>
>>>     SavepointReader savepoint =
>>>         SavepointReader.read(env, savepointLocation, new
>>> HashMapStateBackend());
>>>     // SavepointReader.read(env, savepointLocation, new
>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>
>>>     DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>> mainOperatorState =
>>>         savepoint.readKeyedState(
>>>             MAIN_OPERATOR,
>>>             new
>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>
>>>
>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>> stateReader = mainOperatorState.executeAndCollect()
>>>     stateReader.forEachRemaining( record -> { ...
>>>         /// extract what we need here
>>>     }
>>>
>>>
>>> We tried two approaches:
>>> - One is to read the savepoint with a rockDb backend. That works and is
>>> low on memory usage, but is very very slow. We noticed the iterator is
>>> available very early on, but it is slow...
>>> - The other is to read the savepoint with a HashMap backend. That is
>>> very fast, as expected. However the iterator apparently only returns once
>>> the whole savepoint has been loaded in the HashMap, so heavy memory
>>> consumption.
>>>
>>> Is there a better way to do that? or a way to tune it so that it does
>>> not consume all the memory ? or maybe reading it in parts...
>>>
>>> Thanks
>>>
>>> JM
>>>
>>

Reply via email to