Just to give an update. I've applied the mentioned patch and the execution
time drastically decreased (the gain is 98.9%):

2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader
          [] - Execution time: PT14.690426S

I need to double check what that would mean to correctness and all other
aspects.

G


On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Please report back on how the patch behaves including any side effects.
>
> Now I'm in testing the state reading with processor API vs the mentioned
> job where we control the keys.
> The difference is extreme, especially because the numbers are coming from
> reading ~40Mb state file😅
>
> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader                
>    [] - Execution time: PT22M24.612954S
> ...
> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob             
>    [] - Execution time: PT6.930659S
>
> Don't need to mention that the bigger is the processor API.
>
> G
>
>
> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>
>> That's a good idea, Sadly I have no control over the keys....
>>
>> I was going to patch Flink with the suggestion in FLINK-37109
>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see how
>> that goes. If that brings RockDb performance in an acceptable range for us
>> we might go that way. I really like the light memory consumption of RockDb
>> for that kind of side job.
>>
>> Thanks
>>
>> JM
>>
>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> What I could imagine is to create a normal Flink job,
>>> use execution.state-recovery.path=/path/to/savepoint
>>> set the operator UID on a custom written operator, which opens the state
>>> info for you.
>>>
>>> The only drawback is that you must know the keyBy range... this can be
>>> problematic but if you can do it it's a win :)
>>>
>>> G
>>>
>>>
>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>> wrote:
>>>
>>>> Hi Gabor,
>>>>
>>>> I thought so. I was hoping for a way to read the savepoint in pages,
>>>> instead of as a single blob up front which I think is what the hashmap
>>>> does... we just want to be called for each entry and extract the bit we
>>>> want in that scenario.
>>>>
>>>> Never mind
>>>>
>>>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>>>
>>>> JM
>>>>
>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com> wrote:
>>>>
>>>>> Hi Jean-Marc,
>>>>>
>>>>> We've already realized that the RocksDB approach is not reaching the
>>>>> performance criteria which it should be. There is an open issue for it 
>>>>> [1].
>>>>> The hashmap based approach was and is always expecting more memory. So
>>>>> if the memory footprint is a hard requirement then RocksDB is the only way
>>>>> now.
>>>>>
>>>>> Bad to say but I can't suggest any nifty trick to make it better. All
>>>>> I can promise that I'm now measuring performance of the RocksDB approach
>>>>> and intended to eliminate the slowness. Since we don't know what
>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an
>>>>> imrpvement.
>>>>>
>>>>> All in all it will take some time to sort this out.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <jm.pau...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> What would be the best approach to read a savepoint and minimise the
>>>>>> memory consumption. We just need to transform it into something else for
>>>>>> investigation.
>>>>>>
>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is spread
>>>>>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A
>>>>>> savepoint can be 4-5Gb or more.
>>>>>>
>>>>>> The reader is more basic, using a Local Execution EnvironmentThis is
>>>>>> essentially what we are doing:
>>>>>>
>>>>>>     StreamExecutionEnvironment env =
>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>     env.setParallelism(1);
>>>>>>
>>>>>>     SavepointReader savepoint =
>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>> HashMapStateBackend());
>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>
>>>>>>
>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>> mainOperatorState =
>>>>>>         savepoint.readKeyedState(
>>>>>>             MAIN_OPERATOR,
>>>>>>             new
>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>
>>>>>>
>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>         /// extract what we need here
>>>>>>     }
>>>>>>
>>>>>>
>>>>>> We tried two approaches:
>>>>>> - One is to read the savepoint with a rockDb backend. That works and
>>>>>> is low on memory usage, but is very very slow. We noticed the iterator is
>>>>>> available very early on, but it is slow...
>>>>>> - The other is to read the savepoint with a HashMap backend. That is
>>>>>> very fast, as expected. However the iterator apparently only returns once
>>>>>> the whole savepoint has been loaded in the HashMap, so heavy memory
>>>>>> consumption.
>>>>>>
>>>>>> Is there a better way to do that? or a way to tune it so that it does
>>>>>> not consume all the memory ? or maybe reading it in parts...
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> JM
>>>>>>
>>>>>

Reply via email to