Just a little update on this. We've made our first POC with the redesigned approach and the numbers are promising :) It still requires huge efforts in development/correctness/performance perspective but seems like we have something in the pocket.
Test data: 256Mb state file with a single operator and 2 value states - Old execution time: 25M27.126737S - New execution time: 1M19.602042S In short: ~95% performance gain. G On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > In short, when you don't care about > multiple KeyedStateReaderFunction.readKey calls then you're on the safe > side. > > G > > On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > >> I am still hoping that I am still good. I just read the savepoint to >> extract information (parallelism 1, and only 1 task manager) . I also know >> it has been created by a job using a HashMap backend. And I do not care >> about duplicates. >> >> I should still be good, right? from what I saw I never read any duplicate >> keys. >> >> Thanks >> >> JM >> >> >> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Guys, >>> >>> We've just had an in-depth analysis and we think that removing that >>> particular line causes correctness issues under some circumstances. >>> >>> Namely key duplicates can happen when multiple column families are >>> processed at the same time. Not need to mention that it would cause multiple >>> `readKey` calls which ends up in just wrong calculation logic (for >>> example in simple sum calculation). >>> >>> We've a vision how this can be solved in a clean way but it will take >>> some time. >>> >>> > Are there any plans on a migration guide or something for users to >>> adapt their QS observers (beyond the current docs)? >>> >>> The gap between the two approaches are quite huge and considering the >>> actual bugs and improvement possibilities in the state processor API >>> I would say this can come later on at least on my plate. When you see >>> the gaps and you know how to fill them feel free to contribute and >>> we can shepherd the PRs. >>> >>> G >>> >>> >>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <salcantara...@gmail.com> >>> wrote: >>> >>>> Thanks both for your work on this! >>>> >>>> On a related note, since Queryable State (QS) is going away soon, >>>> streamlining the State Processor API as much as possible makes a lot of >>>> sense. >>>> >>>> Are there any plans on a migration guide or something for users to >>>> adapt their QS observers (beyond the current docs)? >>>> (State-)Observability-wise Flink has some room for improvement I would say. >>>> >>>> Regards, >>>> >>>> Salva >>>> >>>> >>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >>>> wrote: >>>> >>>>> Hi Jean-Marc, >>>>> >>>>> Thanks for your time investment and to share the numbers, it's super >>>>> helpful. >>>>> Ping me any time when you have further info to share. >>>>> >>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible. >>>>> I've seen petabyte scale states so I'm pretty sure we need to go >>>>> beyond... >>>>> >>>>> Since we measure similar numbers with the unpatched Flink plus this >>>>> has been reported this by several users, >>>>> we must make changes in this area. It's still a question whether the >>>>> tested patch is the right approach >>>>> but at least we've touched the root cause. >>>>> >>>>> The next step on my side is to have a deep dive and understand all the >>>>> aspects why remove is there, >>>>> how the remove elimination would effect existing use-cases and >>>>> consider all other possibilities. >>>>> >>>>> BR, >>>>> G >>>>> >>>>> >>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> >>>>> wrote: >>>>> >>>>>> Hi Gabor, >>>>>> >>>>>> I finally got to run that change through. I have a 6Gb savepoint I >>>>>> read and parse for reference. >>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >>>>>> - RockDb with the patch reads it in 48 minutes (and requires less >>>>>> than 2Gb) >>>>>> - RockDb without the patch wasn't even halfway through after 12 >>>>>> hours.... (I gave up) >>>>>> >>>>>> I don't think I have any duplicates because the application that >>>>>> generates the savepoint is using HashMap, so my scenario may not be >>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46). >>>>>> >>>>>> That was run on a VM on my laptop, so not exactly a >>>>>> controlled environment. but I think it's conclusive enough. I will need >>>>>> to >>>>>> run further tests but I think we will patch our Flink. using a system >>>>>> property to configure it. >>>>>> >>>>>> Hope this help >>>>>> >>>>>> JM >>>>>> >>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> Just to give an update. I've applied the mentioned patch and the >>>>>>> execution time drastically decreased (the gain is 98.9%): >>>>>>> >>>>>>> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>> [] - Execution time: PT14.690426S >>>>>>> >>>>>>> I need to double check what that would mean to correctness and all >>>>>>> other aspects. >>>>>>> >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>> >>>>>>>> Please report back on how the patch behaves including any side >>>>>>>> effects. >>>>>>>> >>>>>>>> Now I'm in testing the state reading with processor API vs the >>>>>>>> mentioned job where we control the keys. >>>>>>>> The difference is extreme, especially because the numbers are >>>>>>>> coming from reading ~40Mb state file😅 >>>>>>>> >>>>>>>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>> [] - Execution time: PT22M24.612954S >>>>>>>> ... >>>>>>>> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >>>>>>>> [] - Execution time: PT6.930659S >>>>>>>> >>>>>>>> Don't need to mention that the bigger is the processor API. >>>>>>>> >>>>>>>> G >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> That's a good idea, Sadly I have no control over the keys.... >>>>>>>>> >>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109 >>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see >>>>>>>>> how that goes. If that brings RockDb performance in an acceptable >>>>>>>>> range for >>>>>>>>> us we might go that way. I really like the light memory consumption of >>>>>>>>> RockDb for that kind of side job. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> JM >>>>>>>>> >>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>>>> set the operator UID on a custom written operator, which opens >>>>>>>>>> the state info for you. >>>>>>>>>> >>>>>>>>>> The only drawback is that you must know the keyBy range... this >>>>>>>>>> can be problematic but if you can do it it's a win :) >>>>>>>>>> >>>>>>>>>> G >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Gabor, >>>>>>>>>>> >>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in >>>>>>>>>>> pages, instead of as a single blob up front which I think is what >>>>>>>>>>> the >>>>>>>>>>> hashmap does... we just want to be called for each entry and >>>>>>>>>>> extract the >>>>>>>>>>> bit we want in that scenario. >>>>>>>>>>> >>>>>>>>>>> Never mind >>>>>>>>>>> >>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for nothing. >>>>>>>>>>> >>>>>>>>>>> JM >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>> >>>>>>>>>>>> We've already realized that the RocksDB approach is not >>>>>>>>>>>> reaching the performance criteria which it should be. There is an >>>>>>>>>>>> open >>>>>>>>>>>> issue for it [1]. >>>>>>>>>>>> The hashmap based approach was and is always expecting more >>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then >>>>>>>>>>>> RocksDB is >>>>>>>>>>>> the only way now. >>>>>>>>>>>> >>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it >>>>>>>>>>>> better. All I can promise that I'm now measuring performance of >>>>>>>>>>>> the RocksDB >>>>>>>>>>>> approach >>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know >>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can be >>>>>>>>>>>> also an >>>>>>>>>>>> imrpvement. >>>>>>>>>>>> >>>>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>>>> >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>>>> >>>>>>>>>>>> BR, >>>>>>>>>>>> G >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> What would be the best approach to read a savepoint and >>>>>>>>>>>>> minimise the memory consumption. We just need to transform it into >>>>>>>>>>>>> something else for investigation. >>>>>>>>>>>>> >>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is >>>>>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are >>>>>>>>>>>>> saved on S3. >>>>>>>>>>>>> A savepoint can be 4-5Gb or more. >>>>>>>>>>>>> >>>>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>>>> >>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>>>> >>>>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>> HashMapStateBackend()); >>>>>>>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>> mainOperatorState = >>>>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>>>> new >>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>>>> /// extract what we need here >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> We tried two approaches: >>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That >>>>>>>>>>>>> works and is low on memory usage, but is very very slow. We >>>>>>>>>>>>> noticed the >>>>>>>>>>>>> iterator is available very early on, but it is slow... >>>>>>>>>>>>> - The other is to read the savepoint with a HashMap backend. >>>>>>>>>>>>> That is very fast, as expected. However the iterator apparently >>>>>>>>>>>>> only >>>>>>>>>>>>> returns once the whole savepoint has been loaded in the HashMap, >>>>>>>>>>>>> so heavy >>>>>>>>>>>>> memory consumption. >>>>>>>>>>>>> >>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so that >>>>>>>>>>>>> it does not consume all the memory ? or maybe reading it in >>>>>>>>>>>>> parts... >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> JM >>>>>>>>>>>>> >>>>>>>>>>>>