Hi Jean-Marc, Thanks for your time investment and to share the numbers, it's super helpful. Ping me any time when you have further info to share.
About the numbers: 48 minutes for 6Gb is not good but not terrible. I've seen petabyte scale states so I'm pretty sure we need to go beyond... Since we measure similar numbers with the unpatched Flink plus this has been reported this by several users, we must make changes in this area. It's still a question whether the tested patch is the right approach but at least we've touched the root cause. The next step on my side is to have a deep dive and understand all the aspects why remove is there, how the remove elimination would effect existing use-cases and consider all other possibilities. BR, G On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > Hi Gabor, > > I finally got to run that change through. I have a 6Gb savepoint I read > and parse for reference. > - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) > - RockDb with the patch reads it in 48 minutes (and requires less than 2Gb) > - RockDb without the patch wasn't even halfway through after 12 hours.... > (I gave up) > > I don't think I have any duplicates because the application that generates > the savepoint is using HashMap, so my scenario may not be representative. I > am using IBM Seremu Java 17 (openJ9-0.46). > > That was run on a VM on my laptop, so not exactly a > controlled environment. but I think it's conclusive enough. I will need to > run further tests but I think we will patch our Flink. using a system > property to configure it. > > Hope this help > > JM > > On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Just to give an update. I've applied the mentioned patch and the >> execution time drastically decreased (the gain is 98.9%): >> >> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >> [] - Execution time: PT14.690426S >> >> I need to double check what that would mean to correctness and all other >> aspects. >> >> G >> >> >> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Please report back on how the patch behaves including any side effects. >>> >>> Now I'm in testing the state reading with processor API vs the mentioned >>> job where we control the keys. >>> The difference is extreme, especially because the numbers are coming >>> from reading ~40Mb state file😅 >>> >>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>> [] - Execution time: PT22M24.612954S >>> ... >>> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >>> [] - Execution time: PT6.930659S >>> >>> Don't need to mention that the bigger is the processor API. >>> >>> G >>> >>> >>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: >>> >>>> That's a good idea, Sadly I have no control over the keys.... >>>> >>>> I was going to patch Flink with the suggestion in FLINK-37109 >>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see how >>>> that goes. If that brings RockDb performance in an acceptable range for us >>>> we might go that way. I really like the light memory consumption of RockDb >>>> for that kind of side job. >>>> >>>> Thanks >>>> >>>> JM >>>> >>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>> gabor.g.somo...@gmail.com> wrote: >>>> >>>>> What I could imagine is to create a normal Flink job, >>>>> use execution.state-recovery.path=/path/to/savepoint >>>>> set the operator UID on a custom written operator, which opens the >>>>> state info for you. >>>>> >>>>> The only drawback is that you must know the keyBy range... this can be >>>>> problematic but if you can do it it's a win :) >>>>> >>>>> G >>>>> >>>>> >>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>> wrote: >>>>> >>>>>> Hi Gabor, >>>>>> >>>>>> I thought so. I was hoping for a way to read the savepoint in pages, >>>>>> instead of as a single blob up front which I think is what the hashmap >>>>>> does... we just want to be called for each entry and extract the bit we >>>>>> want in that scenario. >>>>>> >>>>>> Never mind >>>>>> >>>>>> Thank you for the insight. Saves me a lot of hunting for nothing. >>>>>> >>>>>> JM >>>>>> >>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> Hi Jean-Marc, >>>>>>> >>>>>>> We've already realized that the RocksDB approach is not reaching the >>>>>>> performance criteria which it should be. There is an open issue for it >>>>>>> [1]. >>>>>>> The hashmap based approach was and is always expecting more memory. >>>>>>> So if the memory footprint is a hard requirement then RocksDB is the >>>>>>> only >>>>>>> way now. >>>>>>> >>>>>>> Bad to say but I can't suggest any nifty trick to make it better. >>>>>>> All I can promise that I'm now measuring performance of the RocksDB >>>>>>> approach >>>>>>> and intended to eliminate the slowness. Since we don't know what >>>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an >>>>>>> imrpvement. >>>>>>> >>>>>>> All in all it will take some time to sort this out. >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>> >>>>>>> BR, >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>> jm.pau...@gmail.com> wrote: >>>>>>> >>>>>>>> What would be the best approach to read a savepoint and minimise >>>>>>>> the memory consumption. We just need to transform it into something >>>>>>>> else >>>>>>>> for investigation. >>>>>>>> >>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is >>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are saved >>>>>>>> on S3. >>>>>>>> A savepoint can be 4-5Gb or more. >>>>>>>> >>>>>>>> The reader is more basic, using a Local Execution EnvironmentThis >>>>>>>> is essentially what we are doing: >>>>>>>> >>>>>>>> StreamExecutionEnvironment env = >>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>> env.setParallelism(1); >>>>>>>> >>>>>>>> SavepointReader savepoint = >>>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>>> HashMapStateBackend()); >>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>> >>>>>>>> >>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>> mainOperatorState = >>>>>>>> savepoint.readKeyedState( >>>>>>>> MAIN_OPERATOR, >>>>>>>> new >>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>> >>>>>>>> >>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>> /// extract what we need here >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> We tried two approaches: >>>>>>>> - One is to read the savepoint with a rockDb backend. That works >>>>>>>> and is low on memory usage, but is very very slow. We noticed the >>>>>>>> iterator >>>>>>>> is available very early on, but it is slow... >>>>>>>> - The other is to read the savepoint with a HashMap backend. That >>>>>>>> is very fast, as expected. However the iterator apparently only returns >>>>>>>> once the whole savepoint has been loaded in the HashMap, so heavy >>>>>>>> memory >>>>>>>> consumption. >>>>>>>> >>>>>>>> Is there a better way to do that? or a way to tune it so that it >>>>>>>> does not consume all the memory ? or maybe reading it in parts... >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> JM >>>>>>>> >>>>>>>