Just a little update on this. We've made our first POC with the redesigned
approach and the numbers are promising :)
It still requires huge efforts in development/correctness/performance
perspective but seems like we have something in the pocket.

Test data: 256Mb state file with a single operator and 2 value states
- Old execution time: 25M27.126737S
- New execution time: 1M19.602042S
In short: ~95% performance gain.

G


On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> In short, when you don't care about
> multiple KeyedStateReaderFunction.readKey calls then you're on the safe
> side.
>
> G
>
> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>
>> I am still hoping that I am still good. I just read the savepoint to
>> extract information (parallelism 1, and only 1 task manager) . I also know
>> it has been created by a job using a HashMap backend. And I do not care
>> about duplicates.
>>
>> I should still be good, right? from what I saw I never read any duplicate
>> keys.
>>
>> Thanks
>>
>> JM
>>
>>
>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Hi Guys,
>>>
>>> We've just had an in-depth analysis and we think that removing that
>>> particular line causes correctness issues under some circumstances.
>>>
>>> Namely key duplicates can happen when multiple column families are
>>> processed at the same time. Not need to mention that it would cause multiple
>>> `readKey` calls which ends up in just wrong calculation logic (for
>>> example in simple sum calculation).
>>>
>>> We've a vision how this can be solved in a clean way but it will take
>>> some time.
>>>
>>> > Are there any plans on a migration guide or something for users to
>>> adapt their QS observers (beyond the current docs)?
>>>
>>> The gap between the two approaches are quite huge and considering the
>>> actual bugs and improvement possibilities in the state processor API
>>> I would say this can come later on at least on my plate. When you see
>>> the gaps and you know how to fill them feel free to contribute and
>>> we can shepherd the PRs.
>>>
>>> G
>>>
>>>
>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <salcantara...@gmail.com>
>>> wrote:
>>>
>>>> Thanks both for your work on this!
>>>>
>>>> On a related note, since Queryable State (QS) is going away soon,
>>>> streamlining the State Processor API as much as possible makes a lot of
>>>> sense.
>>>>
>>>> Are there any plans on a migration guide or something for users to
>>>> adapt their QS observers (beyond the current docs)?
>>>> (State-)Observability-wise Flink has some room for improvement I would say.
>>>>
>>>> Regards,
>>>>
>>>> Salva
>>>>
>>>>
>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jean-Marc,
>>>>>
>>>>> Thanks for your time investment and to share the numbers, it's super
>>>>> helpful.
>>>>> Ping me any time when you have further info to share.
>>>>>
>>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible.
>>>>> I've seen petabyte scale states so I'm pretty sure we need to go
>>>>> beyond...
>>>>>
>>>>> Since we measure similar numbers with the unpatched Flink plus this
>>>>> has been reported this by several users,
>>>>> we must make changes in this area. It's still a question whether the
>>>>> tested patch is the right approach
>>>>> but at least we've touched the root cause.
>>>>>
>>>>> The next step on my side is to have a deep dive and understand all the
>>>>> aspects why remove is there,
>>>>> how the remove elimination would effect existing use-cases and
>>>>> consider all other possibilities.
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>> wrote:
>>>>>
>>>>>> Hi Gabor,
>>>>>>
>>>>>> I finally got to run that change through. I have a 6Gb savepoint I
>>>>>> read and parse for reference.
>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>>>> - RockDb with the patch reads it in 48 minutes (and requires less
>>>>>> than 2Gb)
>>>>>> - RockDb without the patch wasn't even halfway through after 12
>>>>>> hours.... (I gave up)
>>>>>>
>>>>>> I don't think I have any duplicates because the application that
>>>>>> generates the savepoint is using HashMap, so my scenario may not be
>>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46).
>>>>>>
>>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>>> controlled environment. but I think it's conclusive enough. I will need 
>>>>>> to
>>>>>> run further tests but I think we will patch our Flink. using a system
>>>>>> property to configure it.
>>>>>>
>>>>>> Hope this help
>>>>>>
>>>>>> JM
>>>>>>
>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Just to give an update. I've applied the mentioned patch and the
>>>>>>> execution time drastically decreased (the gain is 98.9%):
>>>>>>>
>>>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader          
>>>>>>>          [] - Execution time: PT14.690426S
>>>>>>>
>>>>>>> I need to double check what that would mean to correctness and all
>>>>>>> other aspects.
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Please report back on how the patch behaves including any side
>>>>>>>> effects.
>>>>>>>>
>>>>>>>> Now I'm in testing the state reading with processor API vs the
>>>>>>>> mentioned job where we control the keys.
>>>>>>>> The difference is extreme, especially because the numbers are
>>>>>>>> coming from reading ~40Mb state file😅
>>>>>>>>
>>>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader         
>>>>>>>>           [] - Execution time: PT22M24.612954S
>>>>>>>> ...
>>>>>>>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob      
>>>>>>>>           [] - Execution time: PT6.930659S
>>>>>>>>
>>>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>>>
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> That's a good idea, Sadly I have no control over the keys....
>>>>>>>>>
>>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109
>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see
>>>>>>>>> how that goes. If that brings RockDb performance in an acceptable 
>>>>>>>>> range for
>>>>>>>>> us we might go that way. I really like the light memory consumption of
>>>>>>>>> RockDb for that kind of side job.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> JM
>>>>>>>>>
>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>>> set the operator UID on a custom written operator, which opens
>>>>>>>>>> the state info for you.
>>>>>>>>>>
>>>>>>>>>> The only drawback is that you must know the keyBy range... this
>>>>>>>>>> can be problematic but if you can do it it's a win :)
>>>>>>>>>>
>>>>>>>>>> G
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>
>>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in
>>>>>>>>>>> pages, instead of as a single blob up front which I think is what 
>>>>>>>>>>> the
>>>>>>>>>>> hashmap does... we just want to be called for each entry and 
>>>>>>>>>>> extract the
>>>>>>>>>>> bit we want in that scenario.
>>>>>>>>>>>
>>>>>>>>>>> Never mind
>>>>>>>>>>>
>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>>>>>>>>>>
>>>>>>>>>>> JM
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>
>>>>>>>>>>>> We've already realized that the RocksDB approach is not
>>>>>>>>>>>> reaching the performance criteria which it should be. There is an 
>>>>>>>>>>>> open
>>>>>>>>>>>> issue for it [1].
>>>>>>>>>>>> The hashmap based approach was and is always expecting more
>>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then 
>>>>>>>>>>>> RocksDB is
>>>>>>>>>>>> the only way now.
>>>>>>>>>>>>
>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it
>>>>>>>>>>>> better. All I can promise that I'm now measuring performance of 
>>>>>>>>>>>> the RocksDB
>>>>>>>>>>>> approach
>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know
>>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can be 
>>>>>>>>>>>> also an
>>>>>>>>>>>> imrpvement.
>>>>>>>>>>>>
>>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>>
>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>>
>>>>>>>>>>>> BR,
>>>>>>>>>>>> G
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What would be the best approach to read a savepoint and
>>>>>>>>>>>>> minimise the memory consumption. We just need to transform it into
>>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is
>>>>>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are 
>>>>>>>>>>>>> saved on S3.
>>>>>>>>>>>>> A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>>
>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>>
>>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>> HashMapStateBackend());
>>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>>             new
>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That
>>>>>>>>>>>>> works and is low on memory usage, but is very very slow. We 
>>>>>>>>>>>>> noticed the
>>>>>>>>>>>>> iterator is available very early on, but it is slow...
>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap backend.
>>>>>>>>>>>>> That is very fast, as expected. However the iterator apparently 
>>>>>>>>>>>>> only
>>>>>>>>>>>>> returns once the whole savepoint has been loaded in the HashMap, 
>>>>>>>>>>>>> so heavy
>>>>>>>>>>>>> memory consumption.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so that
>>>>>>>>>>>>> it does not consume all the memory ? or maybe reading it in 
>>>>>>>>>>>>> parts...
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>> JM
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to