I am still hoping that I am still good. I just read the savepoint to extract information (parallelism 1, and only 1 task manager) . I also know it has been created by a job using a HashMap backend. And I do not care about duplicates.
I should still be good, right? from what I saw I never read any duplicate keys. Thanks JM On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Guys, > > We've just had an in-depth analysis and we think that removing that > particular line causes correctness issues under some circumstances. > > Namely key duplicates can happen when multiple column families are > processed at the same time. Not need to mention that it would cause multiple > `readKey` calls which ends up in just wrong calculation logic (for example > in simple sum calculation). > > We've a vision how this can be solved in a clean way but it will take some > time. > > > Are there any plans on a migration guide or something for users to adapt > their QS observers (beyond the current docs)? > > The gap between the two approaches are quite huge and considering the > actual bugs and improvement possibilities in the state processor API > I would say this can come later on at least on my plate. When you see the > gaps and you know how to fill them feel free to contribute and > we can shepherd the PRs. > > G > > > On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> Thanks both for your work on this! >> >> On a related note, since Queryable State (QS) is going away soon, >> streamlining the State Processor API as much as possible makes a lot of >> sense. >> >> Are there any plans on a migration guide or something for users to adapt >> their QS observers (beyond the current docs)? (State-)Observability-wise >> Flink has some room for improvement I would say. >> >> Regards, >> >> Salva >> >> >> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Jean-Marc, >>> >>> Thanks for your time investment and to share the numbers, it's super >>> helpful. >>> Ping me any time when you have further info to share. >>> >>> About the numbers: 48 minutes for 6Gb is not good but not terrible. >>> I've seen petabyte scale states so I'm pretty sure we need to go >>> beyond... >>> >>> Since we measure similar numbers with the unpatched Flink plus this has >>> been reported this by several users, >>> we must make changes in this area. It's still a question whether the >>> tested patch is the right approach >>> but at least we've touched the root cause. >>> >>> The next step on my side is to have a deep dive and understand all the >>> aspects why remove is there, >>> how the remove elimination would effect existing use-cases and consider >>> all other possibilities. >>> >>> BR, >>> G >>> >>> >>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> wrote: >>> >>>> Hi Gabor, >>>> >>>> I finally got to run that change through. I have a 6Gb savepoint I read >>>> and parse for reference. >>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >>>> - RockDb with the patch reads it in 48 minutes (and requires less than >>>> 2Gb) >>>> - RockDb without the patch wasn't even halfway through after 12 >>>> hours.... (I gave up) >>>> >>>> I don't think I have any duplicates because the application that >>>> generates the savepoint is using HashMap, so my scenario may not be >>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46). >>>> >>>> That was run on a VM on my laptop, so not exactly a >>>> controlled environment. but I think it's conclusive enough. I will need to >>>> run further tests but I think we will patch our Flink. using a system >>>> property to configure it. >>>> >>>> Hope this help >>>> >>>> JM >>>> >>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>>> wrote: >>>> >>>>> Just to give an update. I've applied the mentioned patch and the >>>>> execution time drastically decreased (the gain is 98.9%): >>>>> >>>>> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>> [] - Execution time: PT14.690426S >>>>> >>>>> I need to double check what that would mean to correctness and all >>>>> other aspects. >>>>> >>>>> G >>>>> >>>>> >>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> Please report back on how the patch behaves including any side >>>>>> effects. >>>>>> >>>>>> Now I'm in testing the state reading with processor API vs the >>>>>> mentioned job where we control the keys. >>>>>> The difference is extreme, especially because the numbers are coming >>>>>> from reading ~40Mb state file😅 >>>>>> >>>>>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>> [] - Execution time: PT22M24.612954S >>>>>> ... >>>>>> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >>>>>> [] - Execution time: PT6.930659S >>>>>> >>>>>> Don't need to mention that the bigger is the processor API. >>>>>> >>>>>> G >>>>>> >>>>>> >>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>> wrote: >>>>>> >>>>>>> That's a good idea, Sadly I have no control over the keys.... >>>>>>> >>>>>>> I was going to patch Flink with the suggestion in FLINK-37109 >>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see >>>>>>> how that goes. If that brings RockDb performance in an acceptable range >>>>>>> for >>>>>>> us we might go that way. I really like the light memory consumption of >>>>>>> RockDb for that kind of side job. >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> JM >>>>>>> >>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>> >>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>> set the operator UID on a custom written operator, which opens the >>>>>>>> state info for you. >>>>>>>> >>>>>>>> The only drawback is that you must know the keyBy range... this can >>>>>>>> be problematic but if you can do it it's a win :) >>>>>>>> >>>>>>>> G >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Gabor, >>>>>>>>> >>>>>>>>> I thought so. I was hoping for a way to read the savepoint in >>>>>>>>> pages, instead of as a single blob up front which I think is what the >>>>>>>>> hashmap does... we just want to be called for each entry and extract >>>>>>>>> the >>>>>>>>> bit we want in that scenario. >>>>>>>>> >>>>>>>>> Never mind >>>>>>>>> >>>>>>>>> Thank you for the insight. Saves me a lot of hunting for nothing. >>>>>>>>> >>>>>>>>> JM >>>>>>>>> >>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Jean-Marc, >>>>>>>>>> >>>>>>>>>> We've already realized that the RocksDB approach is not reaching >>>>>>>>>> the performance criteria which it should be. There is an open issue >>>>>>>>>> for it >>>>>>>>>> [1]. >>>>>>>>>> The hashmap based approach was and is always expecting more >>>>>>>>>> memory. So if the memory footprint is a hard requirement then >>>>>>>>>> RocksDB is >>>>>>>>>> the only way now. >>>>>>>>>> >>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it better. >>>>>>>>>> All I can promise that I'm now measuring performance of the RocksDB >>>>>>>>>> approach >>>>>>>>>> and intended to eliminate the slowness. Since we don't know what >>>>>>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an >>>>>>>>>> imrpvement. >>>>>>>>>> >>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>> >>>>>>>>>> BR, >>>>>>>>>> G >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> What would be the best approach to read a savepoint and minimise >>>>>>>>>>> the memory consumption. We just need to transform it into something >>>>>>>>>>> else >>>>>>>>>>> for investigation. >>>>>>>>>>> >>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is >>>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are >>>>>>>>>>> saved on S3. >>>>>>>>>>> A savepoint can be 4-5Gb or more. >>>>>>>>>>> >>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>> >>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>> >>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>>>>>> HashMapStateBackend()); >>>>>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>> mainOperatorState = >>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>> new >>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>> /// extract what we need here >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> We tried two approaches: >>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That >>>>>>>>>>> works and is low on memory usage, but is very very slow. We noticed >>>>>>>>>>> the >>>>>>>>>>> iterator is available very early on, but it is slow... >>>>>>>>>>> - The other is to read the savepoint with a HashMap backend. >>>>>>>>>>> That is very fast, as expected. However the iterator apparently only >>>>>>>>>>> returns once the whole savepoint has been loaded in the HashMap, so >>>>>>>>>>> heavy >>>>>>>>>>> memory consumption. >>>>>>>>>>> >>>>>>>>>>> Is there a better way to do that? or a way to tune it so that it >>>>>>>>>>> does not consume all the memory ? or maybe reading it in parts... >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> JM >>>>>>>>>>> >>>>>>>>>>