In short, when you don't care about multiple KeyedStateReaderFunction.readKey calls then you're on the safe side.
G On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > I am still hoping that I am still good. I just read the savepoint to > extract information (parallelism 1, and only 1 task manager) . I also know > it has been created by a job using a HashMap backend. And I do not care > about duplicates. > > I should still be good, right? from what I saw I never read any duplicate > keys. > > Thanks > > JM > > > On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Hi Guys, >> >> We've just had an in-depth analysis and we think that removing that >> particular line causes correctness issues under some circumstances. >> >> Namely key duplicates can happen when multiple column families are >> processed at the same time. Not need to mention that it would cause multiple >> `readKey` calls which ends up in just wrong calculation logic (for >> example in simple sum calculation). >> >> We've a vision how this can be solved in a clean way but it will take >> some time. >> >> > Are there any plans on a migration guide or something for users to >> adapt their QS observers (beyond the current docs)? >> >> The gap between the two approaches are quite huge and considering the >> actual bugs and improvement possibilities in the state processor API >> I would say this can come later on at least on my plate. When you see the >> gaps and you know how to fill them feel free to contribute and >> we can shepherd the PRs. >> >> G >> >> >> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> Thanks both for your work on this! >>> >>> On a related note, since Queryable State (QS) is going away soon, >>> streamlining the State Processor API as much as possible makes a lot of >>> sense. >>> >>> Are there any plans on a migration guide or something for users to adapt >>> their QS observers (beyond the current docs)? (State-)Observability-wise >>> Flink has some room for improvement I would say. >>> >>> Regards, >>> >>> Salva >>> >>> >>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Hi Jean-Marc, >>>> >>>> Thanks for your time investment and to share the numbers, it's super >>>> helpful. >>>> Ping me any time when you have further info to share. >>>> >>>> About the numbers: 48 minutes for 6Gb is not good but not terrible. >>>> I've seen petabyte scale states so I'm pretty sure we need to go >>>> beyond... >>>> >>>> Since we measure similar numbers with the unpatched Flink plus this has >>>> been reported this by several users, >>>> we must make changes in this area. It's still a question whether the >>>> tested patch is the right approach >>>> but at least we've touched the root cause. >>>> >>>> The next step on my side is to have a deep dive and understand all the >>>> aspects why remove is there, >>>> how the remove elimination would effect existing use-cases and consider >>>> all other possibilities. >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> >>>> wrote: >>>> >>>>> Hi Gabor, >>>>> >>>>> I finally got to run that change through. I have a 6Gb savepoint I >>>>> read and parse for reference. >>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >>>>> - RockDb with the patch reads it in 48 minutes (and requires less than >>>>> 2Gb) >>>>> - RockDb without the patch wasn't even halfway through after 12 >>>>> hours.... (I gave up) >>>>> >>>>> I don't think I have any duplicates because the application that >>>>> generates the savepoint is using HashMap, so my scenario may not be >>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46). >>>>> >>>>> That was run on a VM on my laptop, so not exactly a >>>>> controlled environment. but I think it's conclusive enough. I will need to >>>>> run further tests but I think we will patch our Flink. using a system >>>>> property to configure it. >>>>> >>>>> Hope this help >>>>> >>>>> JM >>>>> >>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> Just to give an update. I've applied the mentioned patch and the >>>>>> execution time drastically decreased (the gain is 98.9%): >>>>>> >>>>>> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>> [] - Execution time: PT14.690426S >>>>>> >>>>>> I need to double check what that would mean to correctness and all >>>>>> other aspects. >>>>>> >>>>>> G >>>>>> >>>>>> >>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> Please report back on how the patch behaves including any side >>>>>>> effects. >>>>>>> >>>>>>> Now I'm in testing the state reading with processor API vs the >>>>>>> mentioned job where we control the keys. >>>>>>> The difference is extreme, especially because the numbers are coming >>>>>>> from reading ~40Mb state file😅 >>>>>>> >>>>>>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>> [] - Execution time: PT22M24.612954S >>>>>>> ... >>>>>>> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >>>>>>> [] - Execution time: PT6.930659S >>>>>>> >>>>>>> Don't need to mention that the bigger is the processor API. >>>>>>> >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>> wrote: >>>>>>> >>>>>>>> That's a good idea, Sadly I have no control over the keys.... >>>>>>>> >>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109 >>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see >>>>>>>> how that goes. If that brings RockDb performance in an acceptable >>>>>>>> range for >>>>>>>> us we might go that way. I really like the light memory consumption of >>>>>>>> RockDb for that kind of side job. >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> JM >>>>>>>> >>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>>> set the operator UID on a custom written operator, which opens the >>>>>>>>> state info for you. >>>>>>>>> >>>>>>>>> The only drawback is that you must know the keyBy range... this >>>>>>>>> can be problematic but if you can do it it's a win :) >>>>>>>>> >>>>>>>>> G >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Gabor, >>>>>>>>>> >>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in >>>>>>>>>> pages, instead of as a single blob up front which I think is what the >>>>>>>>>> hashmap does... we just want to be called for each entry and extract >>>>>>>>>> the >>>>>>>>>> bit we want in that scenario. >>>>>>>>>> >>>>>>>>>> Never mind >>>>>>>>>> >>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for nothing. >>>>>>>>>> >>>>>>>>>> JM >>>>>>>>>> >>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>> >>>>>>>>>>> We've already realized that the RocksDB approach is not reaching >>>>>>>>>>> the performance criteria which it should be. There is an open issue >>>>>>>>>>> for it >>>>>>>>>>> [1]. >>>>>>>>>>> The hashmap based approach was and is always expecting more >>>>>>>>>>> memory. So if the memory footprint is a hard requirement then >>>>>>>>>>> RocksDB is >>>>>>>>>>> the only way now. >>>>>>>>>>> >>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it >>>>>>>>>>> better. All I can promise that I'm now measuring performance of the >>>>>>>>>>> RocksDB >>>>>>>>>>> approach >>>>>>>>>>> and intended to eliminate the slowness. Since we don't know what >>>>>>>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an >>>>>>>>>>> imrpvement. >>>>>>>>>>> >>>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>>> >>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>>> >>>>>>>>>>> BR, >>>>>>>>>>> G >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> What would be the best approach to read a savepoint and >>>>>>>>>>>> minimise the memory consumption. We just need to transform it into >>>>>>>>>>>> something else for investigation. >>>>>>>>>>>> >>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is >>>>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are >>>>>>>>>>>> saved on S3. >>>>>>>>>>>> A savepoint can be 4-5Gb or more. >>>>>>>>>>>> >>>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>>> >>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>>> >>>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>> HashMapStateBackend()); >>>>>>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>> mainOperatorState = >>>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>>> new >>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>>> /// extract what we need here >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> We tried two approaches: >>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That >>>>>>>>>>>> works and is low on memory usage, but is very very slow. We >>>>>>>>>>>> noticed the >>>>>>>>>>>> iterator is available very early on, but it is slow... >>>>>>>>>>>> - The other is to read the savepoint with a HashMap backend. >>>>>>>>>>>> That is very fast, as expected. However the iterator apparently >>>>>>>>>>>> only >>>>>>>>>>>> returns once the whole savepoint has been loaded in the HashMap, >>>>>>>>>>>> so heavy >>>>>>>>>>>> memory consumption. >>>>>>>>>>>> >>>>>>>>>>>> Is there a better way to do that? or a way to tune it so that >>>>>>>>>>>> it does not consume all the memory ? or maybe reading it in >>>>>>>>>>>> parts... >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> JM >>>>>>>>>>>> >>>>>>>>>>>