Hi Gabor,

I finally got to run that change through. I have a 6Gb savepoint I read and
parse for reference.
- HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
- RockDb with the patch reads it in 48 minutes (and requires less than 2Gb)
- RockDb without the patch wasn't even halfway through after 12 hours....
(I gave up)

I don't think I have any duplicates because the application that generates
the savepoint is using HashMap, so my scenario may not be representative. I
am using IBM Seremu Java 17 (openJ9-0.46).

That was run on a VM on my  laptop, so not exactly a
controlled environment. but I think it's conclusive enough. I will need to
run further tests but I think we will patch our Flink. using a system
property to configure it.

Hope this help

JM

On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Just to give an update. I've applied the mentioned patch and the execution
> time drastically decreased (the gain is 98.9%):
>
> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader                
>    [] - Execution time: PT14.690426S
>
> I need to double check what that would mean to correctness and all other
> aspects.
>
> G
>
>
> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Please report back on how the patch behaves including any side effects.
>>
>> Now I'm in testing the state reading with processor API vs the mentioned
>> job where we control the keys.
>> The difference is extreme, especially because the numbers are coming from
>> reading ~40Mb state file😅
>>
>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader               
>>     [] - Execution time: PT22M24.612954S
>> ...
>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob            
>>     [] - Execution time: PT6.930659S
>>
>> Don't need to mention that the bigger is the processor API.
>>
>> G
>>
>>
>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>>
>>> That's a good idea, Sadly I have no control over the keys....
>>>
>>> I was going to patch Flink with the suggestion in FLINK-37109
>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see how
>>> that goes. If that brings RockDb performance in an acceptable range for us
>>> we might go that way. I really like the light memory consumption of RockDb
>>> for that kind of side job.
>>>
>>> Thanks
>>>
>>> JM
>>>
>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> What I could imagine is to create a normal Flink job,
>>>> use execution.state-recovery.path=/path/to/savepoint
>>>> set the operator UID on a custom written operator, which opens the
>>>> state info for you.
>>>>
>>>> The only drawback is that you must know the keyBy range... this can be
>>>> problematic but if you can do it it's a win :)
>>>>
>>>> G
>>>>
>>>>
>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>> wrote:
>>>>
>>>>> Hi Gabor,
>>>>>
>>>>> I thought so. I was hoping for a way to read the savepoint in pages,
>>>>> instead of as a single blob up front which I think is what the hashmap
>>>>> does... we just want to be called for each entry and extract the bit we
>>>>> want in that scenario.
>>>>>
>>>>> Never mind
>>>>>
>>>>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>>>>
>>>>> JM
>>>>>
>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> Hi Jean-Marc,
>>>>>>
>>>>>> We've already realized that the RocksDB approach is not reaching the
>>>>>> performance criteria which it should be. There is an open issue for it 
>>>>>> [1].
>>>>>> The hashmap based approach was and is always expecting more memory.
>>>>>> So if the memory footprint is a hard requirement then RocksDB is the only
>>>>>> way now.
>>>>>>
>>>>>> Bad to say but I can't suggest any nifty trick to make it better. All
>>>>>> I can promise that I'm now measuring performance of the RocksDB approach
>>>>>> and intended to eliminate the slowness. Since we don't know what
>>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an
>>>>>> imrpvement.
>>>>>>
>>>>>> All in all it will take some time to sort this out.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>
>>>>>> BR,
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <jm.pau...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> What would be the best approach to read a savepoint and minimise the
>>>>>>> memory consumption. We just need to transform it into something else for
>>>>>>> investigation.
>>>>>>>
>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is spread
>>>>>>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A
>>>>>>> savepoint can be 4-5Gb or more.
>>>>>>>
>>>>>>> The reader is more basic, using a Local Execution EnvironmentThis is
>>>>>>> essentially what we are doing:
>>>>>>>
>>>>>>>     StreamExecutionEnvironment env =
>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>     env.setParallelism(1);
>>>>>>>
>>>>>>>     SavepointReader savepoint =
>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>> HashMapStateBackend());
>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>
>>>>>>>
>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>> mainOperatorState =
>>>>>>>         savepoint.readKeyedState(
>>>>>>>             MAIN_OPERATOR,
>>>>>>>             new
>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>
>>>>>>>
>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>         /// extract what we need here
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>> We tried two approaches:
>>>>>>> - One is to read the savepoint with a rockDb backend. That works
>>>>>>> and is low on memory usage, but is very very slow. We noticed the 
>>>>>>> iterator
>>>>>>> is available very early on, but it is slow...
>>>>>>> - The other is to read the savepoint with a HashMap backend. That
>>>>>>> is very fast, as expected. However the iterator apparently only returns
>>>>>>> once the whole savepoint has been loaded in the HashMap, so heavy memory
>>>>>>> consumption.
>>>>>>>
>>>>>>> Is there a better way to do that? or a way to tune it so that it
>>>>>>> does not consume all the memory ? or maybe reading it in parts...
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> JM
>>>>>>>
>>>>>>

Reply via email to