Hi Gabor, I finally got to run that change through. I have a 6Gb savepoint I read and parse for reference. - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) - RockDb with the patch reads it in 48 minutes (and requires less than 2Gb) - RockDb without the patch wasn't even halfway through after 12 hours.... (I gave up)
I don't think I have any duplicates because the application that generates the savepoint is using HashMap, so my scenario may not be representative. I am using IBM Seremu Java 17 (openJ9-0.46). That was run on a VM on my laptop, so not exactly a controlled environment. but I think it's conclusive enough. I will need to run further tests but I think we will patch our Flink. using a system property to configure it. Hope this help JM On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Just to give an update. I've applied the mentioned patch and the execution > time drastically decreased (the gain is 98.9%): > > 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader > [] - Execution time: PT14.690426S > > I need to double check what that would mean to correctness and all other > aspects. > > G > > > On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Please report back on how the patch behaves including any side effects. >> >> Now I'm in testing the state reading with processor API vs the mentioned >> job where we control the keys. >> The difference is extreme, especially because the numbers are coming from >> reading ~40Mb state file😅 >> >> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >> [] - Execution time: PT22M24.612954S >> ... >> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >> [] - Execution time: PT6.930659S >> >> Don't need to mention that the bigger is the processor API. >> >> G >> >> >> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: >> >>> That's a good idea, Sadly I have no control over the keys.... >>> >>> I was going to patch Flink with the suggestion in FLINK-37109 >>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see how >>> that goes. If that brings RockDb performance in an acceptable range for us >>> we might go that way. I really like the light memory consumption of RockDb >>> for that kind of side job. >>> >>> Thanks >>> >>> JM >>> >>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> What I could imagine is to create a normal Flink job, >>>> use execution.state-recovery.path=/path/to/savepoint >>>> set the operator UID on a custom written operator, which opens the >>>> state info for you. >>>> >>>> The only drawback is that you must know the keyBy range... this can be >>>> problematic but if you can do it it's a win :) >>>> >>>> G >>>> >>>> >>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>> wrote: >>>> >>>>> Hi Gabor, >>>>> >>>>> I thought so. I was hoping for a way to read the savepoint in pages, >>>>> instead of as a single blob up front which I think is what the hashmap >>>>> does... we just want to be called for each entry and extract the bit we >>>>> want in that scenario. >>>>> >>>>> Never mind >>>>> >>>>> Thank you for the insight. Saves me a lot of hunting for nothing. >>>>> >>>>> JM >>>>> >>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> Hi Jean-Marc, >>>>>> >>>>>> We've already realized that the RocksDB approach is not reaching the >>>>>> performance criteria which it should be. There is an open issue for it >>>>>> [1]. >>>>>> The hashmap based approach was and is always expecting more memory. >>>>>> So if the memory footprint is a hard requirement then RocksDB is the only >>>>>> way now. >>>>>> >>>>>> Bad to say but I can't suggest any nifty trick to make it better. All >>>>>> I can promise that I'm now measuring performance of the RocksDB approach >>>>>> and intended to eliminate the slowness. Since we don't know what >>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an >>>>>> imrpvement. >>>>>> >>>>>> All in all it will take some time to sort this out. >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>> >>>>>> BR, >>>>>> G >>>>>> >>>>>> >>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <jm.pau...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> What would be the best approach to read a savepoint and minimise the >>>>>>> memory consumption. We just need to transform it into something else for >>>>>>> investigation. >>>>>>> >>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is spread >>>>>>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A >>>>>>> savepoint can be 4-5Gb or more. >>>>>>> >>>>>>> The reader is more basic, using a Local Execution EnvironmentThis is >>>>>>> essentially what we are doing: >>>>>>> >>>>>>> StreamExecutionEnvironment env = >>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>> env.setParallelism(1); >>>>>>> >>>>>>> SavepointReader savepoint = >>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>> HashMapStateBackend()); >>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>> >>>>>>> >>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>> mainOperatorState = >>>>>>> savepoint.readKeyedState( >>>>>>> MAIN_OPERATOR, >>>>>>> new >>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>> >>>>>>> >>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>> /// extract what we need here >>>>>>> } >>>>>>> >>>>>>> >>>>>>> We tried two approaches: >>>>>>> - One is to read the savepoint with a rockDb backend. That works >>>>>>> and is low on memory usage, but is very very slow. We noticed the >>>>>>> iterator >>>>>>> is available very early on, but it is slow... >>>>>>> - The other is to read the savepoint with a HashMap backend. That >>>>>>> is very fast, as expected. However the iterator apparently only returns >>>>>>> once the whole savepoint has been loaded in the HashMap, so heavy memory >>>>>>> consumption. >>>>>>> >>>>>>> Is there a better way to do that? or a way to tune it so that it >>>>>>> does not consume all the memory ? or maybe reading it in parts... >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> JM >>>>>>> >>>>>>