Please report back on how the patch behaves including any side effects.

Now I'm in testing the state reading with processor API vs the mentioned
job where we control the keys.
The difference is extreme, especially because the numbers are coming from
reading ~40Mb state file😅

2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader
          [] - Execution time: PT22M24.612954S
...
2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob
          [] - Execution time: PT6.930659S

Don't need to mention that the bigger is the processor API.

G


On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:

> That's a good idea, Sadly I have no control over the keys....
>
> I was going to patch Flink with the suggestion in FLINK-37109
> <https://issues.apache.org/jira/browse/FLINK-37109> first to see how that
> goes. If that brings RockDb performance in an acceptable range for us we
> might go that way. I really like the light memory consumption of RockDb for
> that kind of side job.
>
> Thanks
>
> JM
>
> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> What I could imagine is to create a normal Flink job,
>> use execution.state-recovery.path=/path/to/savepoint
>> set the operator UID on a custom written operator, which opens the state
>> info for you.
>>
>> The only drawback is that you must know the keyBy range... this can be
>> problematic but if you can do it it's a win :)
>>
>> G
>>
>>
>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>>
>>> Hi Gabor,
>>>
>>> I thought so. I was hoping for a way to read the savepoint in pages,
>>> instead of as a single blob up front which I think is what the hashmap
>>> does... we just want to be called for each entry and extract the bit we
>>> want in that scenario.
>>>
>>> Never mind
>>>
>>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>>
>>> JM
>>>
>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jean-Marc,
>>>>
>>>> We've already realized that the RocksDB approach is not reaching the
>>>> performance criteria which it should be. There is an open issue for it [1].
>>>> The hashmap based approach was and is always expecting more memory. So
>>>> if the memory footprint is a hard requirement then RocksDB is the only way
>>>> now.
>>>>
>>>> Bad to say but I can't suggest any nifty trick to make it better. All I
>>>> can promise that I'm now measuring performance of the RocksDB approach
>>>> and intended to eliminate the slowness. Since we don't know what
>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an
>>>> imrpvement.
>>>>
>>>> All in all it will take some time to sort this out.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <jm.pau...@gmail.com>
>>>> wrote:
>>>>
>>>>> What would be the best approach to read a savepoint and minimise the
>>>>> memory consumption. We just need to transform it into something else for
>>>>> investigation.
>>>>>
>>>>> Our flink 1.20 streaming job is using HashMap backend, and is spread
>>>>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A
>>>>> savepoint can be 4-5Gb or more.
>>>>>
>>>>> The reader is more basic, using a Local Execution EnvironmentThis is
>>>>> essentially what we are doing:
>>>>>
>>>>>     StreamExecutionEnvironment env =
>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>     env.setParallelism(1);
>>>>>
>>>>>     SavepointReader savepoint =
>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>> HashMapStateBackend());
>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>
>>>>>     DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>> mainOperatorState =
>>>>>         savepoint.readKeyedState(
>>>>>             MAIN_OPERATOR,
>>>>>             new
>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>
>>>>>
>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>         /// extract what we need here
>>>>>     }
>>>>>
>>>>>
>>>>> We tried two approaches:
>>>>> - One is to read the savepoint with a rockDb backend. That works and
>>>>> is low on memory usage, but is very very slow. We noticed the iterator is
>>>>> available very early on, but it is slow...
>>>>> - The other is to read the savepoint with a HashMap backend. That is
>>>>> very fast, as expected. However the iterator apparently only returns once
>>>>> the whole savepoint has been loaded in the HashMap, so heavy memory
>>>>> consumption.
>>>>>
>>>>> Is there a better way to do that? or a way to tune it so that it does
>>>>> not consume all the memory ? or maybe reading it in parts...
>>>>>
>>>>> Thanks
>>>>>
>>>>> JM
>>>>>
>>>>

Reply via email to