Hi Jean-Marc,

FYI, I've just opened this [1] PR to address the issue in a clean way.
May I ask you to test it on your side?

[1] https://github.com/apache/flink/pull/26134

BR,
G


On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Just a little update on this. We've made our first POC with the redesigned
> approach and the numbers are promising :)
> It still requires huge efforts in development/correctness/performance
> perspective but seems like we have something in the pocket.
>
> Test data: 256Mb state file with a single operator and 2 value states
> - Old execution time: 25M27.126737S
> - New execution time: 1M19.602042S
> In short: ~95% performance gain.
>
> G
>
>
> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> In short, when you don't care about
>> multiple KeyedStateReaderFunction.readKey calls then you're on the safe
>> side.
>>
>> G
>>
>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>>
>>> I am still hoping that I am still good. I just read the savepoint to
>>> extract information (parallelism 1, and only 1 task manager) . I also know
>>> it has been created by a job using a HashMap backend. And I do not care
>>> about duplicates.
>>>
>>> I should still be good, right? from what I saw I never read any
>>> duplicate keys.
>>>
>>> Thanks
>>>
>>> JM
>>>
>>>
>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Guys,
>>>>
>>>> We've just had an in-depth analysis and we think that removing that
>>>> particular line causes correctness issues under some circumstances.
>>>>
>>>> Namely key duplicates can happen when multiple column families are
>>>> processed at the same time. Not need to mention that it would cause 
>>>> multiple
>>>> `readKey` calls which ends up in just wrong calculation logic (for
>>>> example in simple sum calculation).
>>>>
>>>> We've a vision how this can be solved in a clean way but it will take
>>>> some time.
>>>>
>>>> > Are there any plans on a migration guide or something for users to
>>>> adapt their QS observers (beyond the current docs)?
>>>>
>>>> The gap between the two approaches are quite huge and considering the
>>>> actual bugs and improvement possibilities in the state processor API
>>>> I would say this can come later on at least on my plate. When you see
>>>> the gaps and you know how to fill them feel free to contribute and
>>>> we can shepherd the PRs.
>>>>
>>>> G
>>>>
>>>>
>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <
>>>> salcantara...@gmail.com> wrote:
>>>>
>>>>> Thanks both for your work on this!
>>>>>
>>>>> On a related note, since Queryable State (QS) is going away soon,
>>>>> streamlining the State Processor API as much as possible makes a lot of
>>>>> sense.
>>>>>
>>>>> Are there any plans on a migration guide or something for users to
>>>>> adapt their QS observers (beyond the current docs)?
>>>>> (State-)Observability-wise Flink has some room for improvement I would 
>>>>> say.
>>>>>
>>>>> Regards,
>>>>>
>>>>> Salva
>>>>>
>>>>>
>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> Hi Jean-Marc,
>>>>>>
>>>>>> Thanks for your time investment and to share the numbers, it's super
>>>>>> helpful.
>>>>>> Ping me any time when you have further info to share.
>>>>>>
>>>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible.
>>>>>> I've seen petabyte scale states so I'm pretty sure we need to go
>>>>>> beyond...
>>>>>>
>>>>>> Since we measure similar numbers with the unpatched Flink plus this
>>>>>> has been reported this by several users,
>>>>>> we must make changes in this area. It's still a question whether the
>>>>>> tested patch is the right approach
>>>>>> but at least we've touched the root cause.
>>>>>>
>>>>>> The next step on my side is to have a deep dive and understand all
>>>>>> the aspects why remove is there,
>>>>>> how the remove elimination would effect existing use-cases and
>>>>>> consider all other possibilities.
>>>>>>
>>>>>> BR,
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Gabor,
>>>>>>>
>>>>>>> I finally got to run that change through. I have a 6Gb savepoint I
>>>>>>> read and parse for reference.
>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires less
>>>>>>> than 2Gb)
>>>>>>> - RockDb without the patch wasn't even halfway through after 12
>>>>>>> hours.... (I gave up)
>>>>>>>
>>>>>>> I don't think I have any duplicates because the application that
>>>>>>> generates the savepoint is using HashMap, so my scenario may not be
>>>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46).
>>>>>>>
>>>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>>>> controlled environment. but I think it's conclusive enough. I will need 
>>>>>>> to
>>>>>>> run further tests but I think we will patch our Flink. using a system
>>>>>>> property to configure it.
>>>>>>>
>>>>>>> Hope this help
>>>>>>>
>>>>>>> JM
>>>>>>>
>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Just to give an update. I've applied the mentioned patch and the
>>>>>>>> execution time drastically decreased (the gain is 98.9%):
>>>>>>>>
>>>>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader         
>>>>>>>>           [] - Execution time: PT14.690426S
>>>>>>>>
>>>>>>>> I need to double check what that would mean to correctness and all
>>>>>>>> other aspects.
>>>>>>>>
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Please report back on how the patch behaves including any side
>>>>>>>>> effects.
>>>>>>>>>
>>>>>>>>> Now I'm in testing the state reading with processor API vs the
>>>>>>>>> mentioned job where we control the keys.
>>>>>>>>> The difference is extreme, especially because the numbers are
>>>>>>>>> coming from reading ~40Mb state file😅
>>>>>>>>>
>>>>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader        
>>>>>>>>>            [] - Execution time: PT22M24.612954S
>>>>>>>>> ...
>>>>>>>>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob     
>>>>>>>>>            [] - Execution time: PT6.930659S
>>>>>>>>>
>>>>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>>>>
>>>>>>>>> G
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> That's a good idea, Sadly I have no control over the keys....
>>>>>>>>>>
>>>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109
>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see
>>>>>>>>>> how that goes. If that brings RockDb performance in an acceptable 
>>>>>>>>>> range for
>>>>>>>>>> us we might go that way. I really like the light memory consumption 
>>>>>>>>>> of
>>>>>>>>>> RockDb for that kind of side job.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> JM
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>>>> set the operator UID on a custom written operator, which opens
>>>>>>>>>>> the state info for you.
>>>>>>>>>>>
>>>>>>>>>>> The only drawback is that you must know the keyBy range... this
>>>>>>>>>>> can be problematic but if you can do it it's a win :)
>>>>>>>>>>>
>>>>>>>>>>> G
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <
>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>
>>>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in
>>>>>>>>>>>> pages, instead of as a single blob up front which I think is what 
>>>>>>>>>>>> the
>>>>>>>>>>>> hashmap does... we just want to be called for each entry and 
>>>>>>>>>>>> extract the
>>>>>>>>>>>> bit we want in that scenario.
>>>>>>>>>>>>
>>>>>>>>>>>> Never mind
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for
>>>>>>>>>>>> nothing.
>>>>>>>>>>>>
>>>>>>>>>>>> JM
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>
>>>>>>>>>>>>> We've already realized that the RocksDB approach is not
>>>>>>>>>>>>> reaching the performance criteria which it should be. There is an 
>>>>>>>>>>>>> open
>>>>>>>>>>>>> issue for it [1].
>>>>>>>>>>>>> The hashmap based approach was and is always expecting more
>>>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then 
>>>>>>>>>>>>> RocksDB is
>>>>>>>>>>>>> the only way now.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it
>>>>>>>>>>>>> better. All I can promise that I'm now measuring performance of 
>>>>>>>>>>>>> the RocksDB
>>>>>>>>>>>>> approach
>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know
>>>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can be 
>>>>>>>>>>>>> also an
>>>>>>>>>>>>> imrpvement.
>>>>>>>>>>>>>
>>>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>>>
>>>>>>>>>>>>> BR,
>>>>>>>>>>>>> G
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What would be the best approach to read a savepoint and
>>>>>>>>>>>>>> minimise the memory consumption. We just need to transform it 
>>>>>>>>>>>>>> into
>>>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is
>>>>>>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are 
>>>>>>>>>>>>>> saved on S3.
>>>>>>>>>>>>>> A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>> HashMapStateBackend());
>>>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That
>>>>>>>>>>>>>> works and is low on memory usage, but is very very slow. We 
>>>>>>>>>>>>>> noticed the
>>>>>>>>>>>>>> iterator is available very early on, but it is slow...
>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap backend.
>>>>>>>>>>>>>> That is very fast, as expected. However the iterator apparently 
>>>>>>>>>>>>>> only
>>>>>>>>>>>>>> returns once the whole savepoint has been loaded in the HashMap, 
>>>>>>>>>>>>>> so heavy
>>>>>>>>>>>>>> memory consumption.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so that
>>>>>>>>>>>>>> it does not consume all the memory ? or maybe reading it in 
>>>>>>>>>>>>>> parts...
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to