That's a good idea, Sadly I have no control over the keys.... I was going to patch Flink with the suggestion in FLINK-37109 <https://issues.apache.org/jira/browse/FLINK-37109> first to see how that goes. If that brings RockDb performance in an acceptable range for us we might go that way. I really like the light memory consumption of RockDb for that kind of side job.
Thanks JM On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > What I could imagine is to create a normal Flink job, > use execution.state-recovery.path=/path/to/savepoint > set the operator UID on a custom written operator, which opens the state > info for you. > > The only drawback is that you must know the keyBy range... this can be > problematic but if you can do it it's a win :) > > G > > > On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > >> Hi Gabor, >> >> I thought so. I was hoping for a way to read the savepoint in pages, >> instead of as a single blob up front which I think is what the hashmap >> does... we just want to be called for each entry and extract the bit we >> want in that scenario. >> >> Never mind >> >> Thank you for the insight. Saves me a lot of hunting for nothing. >> >> JM >> >> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Jean-Marc, >>> >>> We've already realized that the RocksDB approach is not reaching the >>> performance criteria which it should be. There is an open issue for it [1]. >>> The hashmap based approach was and is always expecting more memory. So >>> if the memory footprint is a hard requirement then RocksDB is the only way >>> now. >>> >>> Bad to say but I can't suggest any nifty trick to make it better. All I >>> can promise that I'm now measuring performance of the RocksDB approach >>> and intended to eliminate the slowness. Since we don't know what exactly >>> causes the slowness the new Frocksdb-8.10.0 can be also an imrpvement. >>> >>> All in all it will take some time to sort this out. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>> >>> BR, >>> G >>> >>> >>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <jm.pau...@gmail.com> >>> wrote: >>> >>>> What would be the best approach to read a savepoint and minimise the >>>> memory consumption. We just need to transform it into something else for >>>> investigation. >>>> >>>> Our flink 1.20 streaming job is using HashMap backend, and is spread >>>> over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A >>>> savepoint can be 4-5Gb or more. >>>> >>>> The reader is more basic, using a Local Execution EnvironmentThis is >>>> essentially what we are doing: >>>> >>>> StreamExecutionEnvironment env = >>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>> env.setParallelism(1); >>>> >>>> SavepointReader savepoint = >>>> SavepointReader.read(env, savepointLocation, new >>>> HashMapStateBackend()); >>>> // SavepointReader.read(env, savepointLocation, new >>>> EmbeddedRocksDBStateBackend()); // Too slow >>>> >>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>> mainOperatorState = >>>> savepoint.readKeyedState( >>>> MAIN_OPERATOR, >>>> new >>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>> >>>> >>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>> stateReader = mainOperatorState.executeAndCollect() >>>> stateReader.forEachRemaining( record -> { ... >>>> /// extract what we need here >>>> } >>>> >>>> >>>> We tried two approaches: >>>> - One is to read the savepoint with a rockDb backend. That works and >>>> is low on memory usage, but is very very slow. We noticed the iterator is >>>> available very early on, but it is slow... >>>> - The other is to read the savepoint with a HashMap backend. That is >>>> very fast, as expected. However the iterator apparently only returns once >>>> the whole savepoint has been loaded in the HashMap, so heavy memory >>>> consumption. >>>> >>>> Is there a better way to do that? or a way to tune it so that it does >>>> not consume all the memory ? or maybe reading it in parts... >>>> >>>> Thanks >>>> >>>> JM >>>> >>>