Hi Jean-Marc, FYI, I've just opened this [1] PR to address the issue in a clean way. May I ask you to test it on your side?
[1] https://github.com/apache/flink/pull/26134 BR, G On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Just a little update on this. We've made our first POC with the redesigned > approach and the numbers are promising :) > It still requires huge efforts in development/correctness/performance > perspective but seems like we have something in the pocket. > > Test data: 256Mb state file with a single operator and 2 value states > - Old execution time: 25M27.126737S > - New execution time: 1M19.602042S > In short: ~95% performance gain. > > G > > > On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> In short, when you don't care about >> multiple KeyedStateReaderFunction.readKey calls then you're on the safe >> side. >> >> G >> >> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: >> >>> I am still hoping that I am still good. I just read the savepoint to >>> extract information (parallelism 1, and only 1 task manager) . I also know >>> it has been created by a job using a HashMap backend. And I do not care >>> about duplicates. >>> >>> I should still be good, right? from what I saw I never read any >>> duplicate keys. >>> >>> Thanks >>> >>> JM >>> >>> >>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Hi Guys, >>>> >>>> We've just had an in-depth analysis and we think that removing that >>>> particular line causes correctness issues under some circumstances. >>>> >>>> Namely key duplicates can happen when multiple column families are >>>> processed at the same time. Not need to mention that it would cause >>>> multiple >>>> `readKey` calls which ends up in just wrong calculation logic (for >>>> example in simple sum calculation). >>>> >>>> We've a vision how this can be solved in a clean way but it will take >>>> some time. >>>> >>>> > Are there any plans on a migration guide or something for users to >>>> adapt their QS observers (beyond the current docs)? >>>> >>>> The gap between the two approaches are quite huge and considering the >>>> actual bugs and improvement possibilities in the state processor API >>>> I would say this can come later on at least on my plate. When you see >>>> the gaps and you know how to fill them feel free to contribute and >>>> we can shepherd the PRs. >>>> >>>> G >>>> >>>> >>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara < >>>> salcantara...@gmail.com> wrote: >>>> >>>>> Thanks both for your work on this! >>>>> >>>>> On a related note, since Queryable State (QS) is going away soon, >>>>> streamlining the State Processor API as much as possible makes a lot of >>>>> sense. >>>>> >>>>> Are there any plans on a migration guide or something for users to >>>>> adapt their QS observers (beyond the current docs)? >>>>> (State-)Observability-wise Flink has some room for improvement I would >>>>> say. >>>>> >>>>> Regards, >>>>> >>>>> Salva >>>>> >>>>> >>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> Hi Jean-Marc, >>>>>> >>>>>> Thanks for your time investment and to share the numbers, it's super >>>>>> helpful. >>>>>> Ping me any time when you have further info to share. >>>>>> >>>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible. >>>>>> I've seen petabyte scale states so I'm pretty sure we need to go >>>>>> beyond... >>>>>> >>>>>> Since we measure similar numbers with the unpatched Flink plus this >>>>>> has been reported this by several users, >>>>>> we must make changes in this area. It's still a question whether the >>>>>> tested patch is the right approach >>>>>> but at least we've touched the root cause. >>>>>> >>>>>> The next step on my side is to have a deep dive and understand all >>>>>> the aspects why remove is there, >>>>>> how the remove elimination would effect existing use-cases and >>>>>> consider all other possibilities. >>>>>> >>>>>> BR, >>>>>> G >>>>>> >>>>>> >>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>> wrote: >>>>>> >>>>>>> Hi Gabor, >>>>>>> >>>>>>> I finally got to run that change through. I have a 6Gb savepoint I >>>>>>> read and parse for reference. >>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >>>>>>> - RockDb with the patch reads it in 48 minutes (and requires less >>>>>>> than 2Gb) >>>>>>> - RockDb without the patch wasn't even halfway through after 12 >>>>>>> hours.... (I gave up) >>>>>>> >>>>>>> I don't think I have any duplicates because the application that >>>>>>> generates the savepoint is using HashMap, so my scenario may not be >>>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46). >>>>>>> >>>>>>> That was run on a VM on my laptop, so not exactly a >>>>>>> controlled environment. but I think it's conclusive enough. I will need >>>>>>> to >>>>>>> run further tests but I think we will patch our Flink. using a system >>>>>>> property to configure it. >>>>>>> >>>>>>> Hope this help >>>>>>> >>>>>>> JM >>>>>>> >>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi < >>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>> >>>>>>>> Just to give an update. I've applied the mentioned patch and the >>>>>>>> execution time drastically decreased (the gain is 98.9%): >>>>>>>> >>>>>>>> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>> [] - Execution time: PT14.690426S >>>>>>>> >>>>>>>> I need to double check what that would mean to correctness and all >>>>>>>> other aspects. >>>>>>>> >>>>>>>> G >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Please report back on how the patch behaves including any side >>>>>>>>> effects. >>>>>>>>> >>>>>>>>> Now I'm in testing the state reading with processor API vs the >>>>>>>>> mentioned job where we control the keys. >>>>>>>>> The difference is extreme, especially because the numbers are >>>>>>>>> coming from reading ~40Mb state file😅 >>>>>>>>> >>>>>>>>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>>> [] - Execution time: PT22M24.612954S >>>>>>>>> ... >>>>>>>>> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >>>>>>>>> [] - Execution time: PT6.930659S >>>>>>>>> >>>>>>>>> Don't need to mention that the bigger is the processor API. >>>>>>>>> >>>>>>>>> G >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> That's a good idea, Sadly I have no control over the keys.... >>>>>>>>>> >>>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109 >>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see >>>>>>>>>> how that goes. If that brings RockDb performance in an acceptable >>>>>>>>>> range for >>>>>>>>>> us we might go that way. I really like the light memory consumption >>>>>>>>>> of >>>>>>>>>> RockDb for that kind of side job. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>>> JM >>>>>>>>>> >>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>>>>> set the operator UID on a custom written operator, which opens >>>>>>>>>>> the state info for you. >>>>>>>>>>> >>>>>>>>>>> The only drawback is that you must know the keyBy range... this >>>>>>>>>>> can be problematic but if you can do it it's a win :) >>>>>>>>>>> >>>>>>>>>>> G >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin < >>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>> >>>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in >>>>>>>>>>>> pages, instead of as a single blob up front which I think is what >>>>>>>>>>>> the >>>>>>>>>>>> hashmap does... we just want to be called for each entry and >>>>>>>>>>>> extract the >>>>>>>>>>>> bit we want in that scenario. >>>>>>>>>>>> >>>>>>>>>>>> Never mind >>>>>>>>>>>> >>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for >>>>>>>>>>>> nothing. >>>>>>>>>>>> >>>>>>>>>>>> JM >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>>> >>>>>>>>>>>>> We've already realized that the RocksDB approach is not >>>>>>>>>>>>> reaching the performance criteria which it should be. There is an >>>>>>>>>>>>> open >>>>>>>>>>>>> issue for it [1]. >>>>>>>>>>>>> The hashmap based approach was and is always expecting more >>>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then >>>>>>>>>>>>> RocksDB is >>>>>>>>>>>>> the only way now. >>>>>>>>>>>>> >>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it >>>>>>>>>>>>> better. All I can promise that I'm now measuring performance of >>>>>>>>>>>>> the RocksDB >>>>>>>>>>>>> approach >>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know >>>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can be >>>>>>>>>>>>> also an >>>>>>>>>>>>> imrpvement. >>>>>>>>>>>>> >>>>>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>>>>> >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>>>>> >>>>>>>>>>>>> BR, >>>>>>>>>>>>> G >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> What would be the best approach to read a savepoint and >>>>>>>>>>>>>> minimise the memory consumption. We just need to transform it >>>>>>>>>>>>>> into >>>>>>>>>>>>>> something else for investigation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is >>>>>>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are >>>>>>>>>>>>>> saved on S3. >>>>>>>>>>>>>> A savepoint can be 4-5Gb or more. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>>>>> >>>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>>>>> >>>>>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>>> HashMapStateBackend()); >>>>>>>>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>> mainOperatorState = >>>>>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>>>>> new >>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>>>>> /// extract what we need here >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> We tried two approaches: >>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That >>>>>>>>>>>>>> works and is low on memory usage, but is very very slow. We >>>>>>>>>>>>>> noticed the >>>>>>>>>>>>>> iterator is available very early on, but it is slow... >>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap backend. >>>>>>>>>>>>>> That is very fast, as expected. However the iterator apparently >>>>>>>>>>>>>> only >>>>>>>>>>>>>> returns once the whole savepoint has been loaded in the HashMap, >>>>>>>>>>>>>> so heavy >>>>>>>>>>>>>> memory consumption. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so that >>>>>>>>>>>>>> it does not consume all the memory ? or maybe reading it in >>>>>>>>>>>>>> parts... >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> >>>>>>>>>>>>>> JM >>>>>>>>>>>>>> >>>>>>>>>>>>>