Just to give an update. I've applied the mentioned patch and the execution time drastically decreased (the gain is 98.9%):
2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader [] - Execution time: PT14.690426S I need to double check what that would mean to correctness and all other aspects. G On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Please report back on how the patch behaves including any side effects. > > Now I'm in testing the state reading with processor API vs the mentioned > job where we control the keys. > The difference is extreme, especially because the numbers are coming from > reading ~40Mb state file😅 > > 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader > [] - Execution time: PT22M24.612954S > ... > 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob > [] - Execution time: PT6.930659S > > Don't need to mention that the bigger is the processor API. > > G > > > On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > >> That's a good idea, Sadly I have no control over the keys.... >> >> I was going to patch Flink with the suggestion in FLINK-37109 >> <https://issues.apache.org/jira/browse/FLINK-37109> first to see how >> that goes. If that brings RockDb performance in an acceptable range for us >> we might go that way. I really like the light memory consumption of RockDb >> for that kind of side job. >> >> Thanks >> >> JM >> >> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> What I could imagine is to create a normal Flink job, >>> use execution.state-recovery.path=/path/to/savepoint >>> set the operator UID on a custom written operator, which opens the state >>> info for you. >>> >>> The only drawback is that you must know the keyBy range... this can be >>> problematic but if you can do it it's a win :) >>> >>> G >>> >>> >>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>> wrote: >>> >>>> Hi Gabor, >>>> >>>> I thought so. I was hoping for a way to read the savepoint in pages, >>>> instead of as a single blob up front which I think is what the hashmap >>>> does... we just want to be called for each entry and extract the bit we >>>> want in that scenario. >>>> >>>> Never mind >>>> >>>> Thank you for the insight. Saves me a lot of hunting for nothing. >>>> >>>> JM >>>> >>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>> gabor.g.somo...@gmail.com> wrote: >>>> >>>>> Hi Jean-Marc, >>>>> >>>>> We've already realized that the RocksDB approach is not reaching the >>>>> performance criteria which it should be. There is an open issue for it >>>>> [1]. >>>>> The hashmap based approach was and is always expecting more memory. So >>>>> if the memory footprint is a hard requirement then RocksDB is the only way >>>>> now. >>>>> >>>>> Bad to say but I can't suggest any nifty trick to make it better. All >>>>> I can promise that I'm now measuring performance of the RocksDB approach >>>>> and intended to eliminate the slowness. Since we don't know what >>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an >>>>> imrpvement. >>>>> >>>>> All in all it will take some time to sort this out. >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>> >>>>> BR, >>>>> G >>>>> >>>>> >>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <jm.pau...@gmail.com> >>>>> wrote: >>>>> >>>>>> What would be the best approach to read a savepoint and minimise the >>>>>> memory consumption. We just need to transform it into something else for >>>>>> investigation. >>>>>> >>>>>> Our flink 1.20 streaming job is using HashMap backend, and is spread >>>>>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A >>>>>> savepoint can be 4-5Gb or more. >>>>>> >>>>>> The reader is more basic, using a Local Execution EnvironmentThis is >>>>>> essentially what we are doing: >>>>>> >>>>>> StreamExecutionEnvironment env = >>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>> env.setParallelism(1); >>>>>> >>>>>> SavepointReader savepoint = >>>>>> SavepointReader.read(env, savepointLocation, new >>>>>> HashMapStateBackend()); >>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>> >>>>>> >>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>> mainOperatorState = >>>>>> savepoint.readKeyedState( >>>>>> MAIN_OPERATOR, >>>>>> new >>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>> >>>>>> >>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>> stateReader.forEachRemaining( record -> { ... >>>>>> /// extract what we need here >>>>>> } >>>>>> >>>>>> >>>>>> We tried two approaches: >>>>>> - One is to read the savepoint with a rockDb backend. That works and >>>>>> is low on memory usage, but is very very slow. We noticed the iterator is >>>>>> available very early on, but it is slow... >>>>>> - The other is to read the savepoint with a HashMap backend. That is >>>>>> very fast, as expected. However the iterator apparently only returns once >>>>>> the whole savepoint has been loaded in the HashMap, so heavy memory >>>>>> consumption. >>>>>> >>>>>> Is there a better way to do that? or a way to tune it so that it does >>>>>> not consume all the memory ? or maybe reading it in parts... >>>>>> >>>>>> Thanks >>>>>> >>>>>> JM >>>>>> >>>>>