Hi Gabor, Trying to but I struggle to compile my stuff against your Flink build... tried to apply your PR as a patch on my 1.20 modified fork and that didn't go well either. It will take time to untangle.
Will keep you updated if I make progress JM On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Jean-Marc, > > FYI, I've just opened this [1] PR to address the issue in a clean way. > May I ask you to test it on your side? > > [1] https://github.com/apache/flink/pull/26134 > > BR, > G > > > On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Just a little update on this. We've made our first POC with the >> redesigned approach and the numbers are promising :) >> It still requires huge efforts in development/correctness/performance >> perspective but seems like we have something in the pocket. >> >> Test data: 256Mb state file with a single operator and 2 value states >> - Old execution time: 25M27.126737S >> - New execution time: 1M19.602042S >> In short: ~95% performance gain. >> >> G >> >> >> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> In short, when you don't care about >>> multiple KeyedStateReaderFunction.readKey calls then you're on the safe >>> side. >>> >>> G >>> >>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: >>> >>>> I am still hoping that I am still good. I just read the savepoint to >>>> extract information (parallelism 1, and only 1 task manager) . I also know >>>> it has been created by a job using a HashMap backend. And I do not care >>>> about duplicates. >>>> >>>> I should still be good, right? from what I saw I never read any >>>> duplicate keys. >>>> >>>> Thanks >>>> >>>> JM >>>> >>>> >>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>>> wrote: >>>> >>>>> Hi Guys, >>>>> >>>>> We've just had an in-depth analysis and we think that removing that >>>>> particular line causes correctness issues under some circumstances. >>>>> >>>>> Namely key duplicates can happen when multiple column families are >>>>> processed at the same time. Not need to mention that it would cause >>>>> multiple >>>>> `readKey` calls which ends up in just wrong calculation logic (for >>>>> example in simple sum calculation). >>>>> >>>>> We've a vision how this can be solved in a clean way but it will take >>>>> some time. >>>>> >>>>> > Are there any plans on a migration guide or something for users to >>>>> adapt their QS observers (beyond the current docs)? >>>>> >>>>> The gap between the two approaches are quite huge and considering the >>>>> actual bugs and improvement possibilities in the state processor API >>>>> I would say this can come later on at least on my plate. When you see >>>>> the gaps and you know how to fill them feel free to contribute and >>>>> we can shepherd the PRs. >>>>> >>>>> G >>>>> >>>>> >>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara < >>>>> salcantara...@gmail.com> wrote: >>>>> >>>>>> Thanks both for your work on this! >>>>>> >>>>>> On a related note, since Queryable State (QS) is going away soon, >>>>>> streamlining the State Processor API as much as possible makes a lot of >>>>>> sense. >>>>>> >>>>>> Are there any plans on a migration guide or something for users to >>>>>> adapt their QS observers (beyond the current docs)? >>>>>> (State-)Observability-wise Flink has some room for improvement I would >>>>>> say. >>>>>> >>>>>> Regards, >>>>>> >>>>>> Salva >>>>>> >>>>>> >>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> Hi Jean-Marc, >>>>>>> >>>>>>> Thanks for your time investment and to share the numbers, it's super >>>>>>> helpful. >>>>>>> Ping me any time when you have further info to share. >>>>>>> >>>>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible. >>>>>>> I've seen petabyte scale states so I'm pretty sure we need to go >>>>>>> beyond... >>>>>>> >>>>>>> Since we measure similar numbers with the unpatched Flink plus this >>>>>>> has been reported this by several users, >>>>>>> we must make changes in this area. It's still a question whether the >>>>>>> tested patch is the right approach >>>>>>> but at least we've touched the root cause. >>>>>>> >>>>>>> The next step on my side is to have a deep dive and understand all >>>>>>> the aspects why remove is there, >>>>>>> how the remove elimination would effect existing use-cases and >>>>>>> consider all other possibilities. >>>>>>> >>>>>>> BR, >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Gabor, >>>>>>>> >>>>>>>> I finally got to run that change through. I have a 6Gb savepoint I >>>>>>>> read and parse for reference. >>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires less >>>>>>>> than 2Gb) >>>>>>>> - RockDb without the patch wasn't even halfway through after 12 >>>>>>>> hours.... (I gave up) >>>>>>>> >>>>>>>> I don't think I have any duplicates because the application that >>>>>>>> generates the savepoint is using HashMap, so my scenario may not be >>>>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46). >>>>>>>> >>>>>>>> That was run on a VM on my laptop, so not exactly a >>>>>>>> controlled environment. but I think it's conclusive enough. I will >>>>>>>> need to >>>>>>>> run further tests but I think we will patch our Flink. using a system >>>>>>>> property to configure it. >>>>>>>> >>>>>>>> Hope this help >>>>>>>> >>>>>>>> JM >>>>>>>> >>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi < >>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Just to give an update. I've applied the mentioned patch and the >>>>>>>>> execution time drastically decreased (the gain is 98.9%): >>>>>>>>> >>>>>>>>> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>>> [] - Execution time: PT14.690426S >>>>>>>>> >>>>>>>>> I need to double check what that would mean to correctness and all >>>>>>>>> other aspects. >>>>>>>>> >>>>>>>>> G >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Please report back on how the patch behaves including any side >>>>>>>>>> effects. >>>>>>>>>> >>>>>>>>>> Now I'm in testing the state reading with processor API vs the >>>>>>>>>> mentioned job where we control the keys. >>>>>>>>>> The difference is extreme, especially because the numbers are >>>>>>>>>> coming from reading ~40Mb state file😅 >>>>>>>>>> >>>>>>>>>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>>>> [] - Execution time: PT22M24.612954S >>>>>>>>>> ... >>>>>>>>>> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >>>>>>>>>> [] - Execution time: PT6.930659S >>>>>>>>>> >>>>>>>>>> Don't need to mention that the bigger is the processor API. >>>>>>>>>> >>>>>>>>>> G >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> That's a good idea, Sadly I have no control over the keys.... >>>>>>>>>>> >>>>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109 >>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to >>>>>>>>>>> see how that goes. If that brings RockDb performance in an >>>>>>>>>>> acceptable range >>>>>>>>>>> for us we might go that way. I really like the light memory >>>>>>>>>>> consumption of >>>>>>>>>>> RockDb for that kind of side job. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> JM >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>>>>>> set the operator UID on a custom written operator, which opens >>>>>>>>>>>> the state info for you. >>>>>>>>>>>> >>>>>>>>>>>> The only drawback is that you must know the keyBy range... this >>>>>>>>>>>> can be problematic but if you can do it it's a win :) >>>>>>>>>>>> >>>>>>>>>>>> G >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin < >>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>>> >>>>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in >>>>>>>>>>>>> pages, instead of as a single blob up front which I think is what >>>>>>>>>>>>> the >>>>>>>>>>>>> hashmap does... we just want to be called for each entry and >>>>>>>>>>>>> extract the >>>>>>>>>>>>> bit we want in that scenario. >>>>>>>>>>>>> >>>>>>>>>>>>> Never mind >>>>>>>>>>>>> >>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for >>>>>>>>>>>>> nothing. >>>>>>>>>>>>> >>>>>>>>>>>>> JM >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>>>> >>>>>>>>>>>>>> We've already realized that the RocksDB approach is not >>>>>>>>>>>>>> reaching the performance criteria which it should be. There is >>>>>>>>>>>>>> an open >>>>>>>>>>>>>> issue for it [1]. >>>>>>>>>>>>>> The hashmap based approach was and is always expecting more >>>>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then >>>>>>>>>>>>>> RocksDB is >>>>>>>>>>>>>> the only way now. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it >>>>>>>>>>>>>> better. All I can promise that I'm now measuring performance of >>>>>>>>>>>>>> the RocksDB >>>>>>>>>>>>>> approach >>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know >>>>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can be >>>>>>>>>>>>>> also an >>>>>>>>>>>>>> imrpvement. >>>>>>>>>>>>>> >>>>>>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>>>>>> >>>>>>>>>>>>>> BR, >>>>>>>>>>>>>> G >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> What would be the best approach to read a savepoint and >>>>>>>>>>>>>>> minimise the memory consumption. We just need to transform it >>>>>>>>>>>>>>> into >>>>>>>>>>>>>>> something else for investigation. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and >>>>>>>>>>>>>>> is spread over 6 task slots in 6 pods (under k8s). Savepoints >>>>>>>>>>>>>>> are saved on >>>>>>>>>>>>>>> S3. A savepoint can be 4-5Gb or more. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>>>> HashMapStateBackend()); >>>>>>>>>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>> mainOperatorState = >>>>>>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>>>>>> new >>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>>>>>> /// extract what we need here >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We tried two approaches: >>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That >>>>>>>>>>>>>>> works and is low on memory usage, but is very very slow. We >>>>>>>>>>>>>>> noticed the >>>>>>>>>>>>>>> iterator is available very early on, but it is slow... >>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap >>>>>>>>>>>>>>> backend. That is very fast, as expected. However the iterator >>>>>>>>>>>>>>> apparently >>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded in the >>>>>>>>>>>>>>> HashMap, so >>>>>>>>>>>>>>> heavy memory consumption. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so >>>>>>>>>>>>>>> that it does not consume all the memory ? or maybe reading it >>>>>>>>>>>>>>> in parts... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>> >>>>>>>>>>>>>>