Thanks both for your work on this! On a related note, since Queryable State (QS) is going away soon, streamlining the State Processor API as much as possible makes a lot of sense.
Are there any plans on a migration guide or something for users to adapt their QS observers (beyond the current docs)? (State-)Observability-wise Flink has some room for improvement I would say. Regards, Salva On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Jean-Marc, > > Thanks for your time investment and to share the numbers, it's super > helpful. > Ping me any time when you have further info to share. > > About the numbers: 48 minutes for 6Gb is not good but not terrible. > I've seen petabyte scale states so I'm pretty sure we need to go beyond... > > Since we measure similar numbers with the unpatched Flink plus this has > been reported this by several users, > we must make changes in this area. It's still a question whether the > tested patch is the right approach > but at least we've touched the root cause. > > The next step on my side is to have a deep dive and understand all the > aspects why remove is there, > how the remove elimination would effect existing use-cases and consider > all other possibilities. > > BR, > G > > > On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > >> Hi Gabor, >> >> I finally got to run that change through. I have a 6Gb savepoint I read >> and parse for reference. >> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >> - RockDb with the patch reads it in 48 minutes (and requires less than >> 2Gb) >> - RockDb without the patch wasn't even halfway through after 12 hours.... >> (I gave up) >> >> I don't think I have any duplicates because the application that >> generates the savepoint is using HashMap, so my scenario may not be >> representative. I am using IBM Seremu Java 17 (openJ9-0.46). >> >> That was run on a VM on my laptop, so not exactly a >> controlled environment. but I think it's conclusive enough. I will need to >> run further tests but I think we will patch our Flink. using a system >> property to configure it. >> >> Hope this help >> >> JM >> >> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Just to give an update. I've applied the mentioned patch and the >>> execution time drastically decreased (the gain is 98.9%): >>> >>> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >>> [] - Execution time: PT14.690426S >>> >>> I need to double check what that would mean to correctness and all other >>> aspects. >>> >>> G >>> >>> >>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Please report back on how the patch behaves including any side effects. >>>> >>>> Now I'm in testing the state reading with processor API vs the >>>> mentioned job where we control the keys. >>>> The difference is extreme, especially because the numbers are coming >>>> from reading ~40Mb state file😅 >>>> >>>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>>> [] - Execution time: PT22M24.612954S >>>> ... >>>> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >>>> [] - Execution time: PT6.930659S >>>> >>>> Don't need to mention that the bigger is the processor API. >>>> >>>> G >>>> >>>> >>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>> wrote: >>>> >>>>> That's a good idea, Sadly I have no control over the keys.... >>>>> >>>>> I was going to patch Flink with the suggestion in FLINK-37109 >>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see how >>>>> that goes. If that brings RockDb performance in an acceptable range for us >>>>> we might go that way. I really like the light memory consumption of RockDb >>>>> for that kind of side job. >>>>> >>>>> Thanks >>>>> >>>>> JM >>>>> >>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> What I could imagine is to create a normal Flink job, >>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>> set the operator UID on a custom written operator, which opens the >>>>>> state info for you. >>>>>> >>>>>> The only drawback is that you must know the keyBy range... this can >>>>>> be problematic but if you can do it it's a win :) >>>>>> >>>>>> G >>>>>> >>>>>> >>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>> wrote: >>>>>> >>>>>>> Hi Gabor, >>>>>>> >>>>>>> I thought so. I was hoping for a way to read the savepoint in pages, >>>>>>> instead of as a single blob up front which I think is what the hashmap >>>>>>> does... we just want to be called for each entry and extract the bit we >>>>>>> want in that scenario. >>>>>>> >>>>>>> Never mind >>>>>>> >>>>>>> Thank you for the insight. Saves me a lot of hunting for nothing. >>>>>>> >>>>>>> JM >>>>>>> >>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Jean-Marc, >>>>>>>> >>>>>>>> We've already realized that the RocksDB approach is not reaching >>>>>>>> the performance criteria which it should be. There is an open issue >>>>>>>> for it >>>>>>>> [1]. >>>>>>>> The hashmap based approach was and is always expecting more memory. >>>>>>>> So if the memory footprint is a hard requirement then RocksDB is the >>>>>>>> only >>>>>>>> way now. >>>>>>>> >>>>>>>> Bad to say but I can't suggest any nifty trick to make it better. >>>>>>>> All I can promise that I'm now measuring performance of the RocksDB >>>>>>>> approach >>>>>>>> and intended to eliminate the slowness. Since we don't know what >>>>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an >>>>>>>> imrpvement. >>>>>>>> >>>>>>>> All in all it will take some time to sort this out. >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>> >>>>>>>> BR, >>>>>>>> G >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>> >>>>>>>>> What would be the best approach to read a savepoint and minimise >>>>>>>>> the memory consumption. We just need to transform it into something >>>>>>>>> else >>>>>>>>> for investigation. >>>>>>>>> >>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is >>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are saved >>>>>>>>> on S3. >>>>>>>>> A savepoint can be 4-5Gb or more. >>>>>>>>> >>>>>>>>> The reader is more basic, using a Local Execution EnvironmentThis >>>>>>>>> is essentially what we are doing: >>>>>>>>> >>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>> env.setParallelism(1); >>>>>>>>> >>>>>>>>> SavepointReader savepoint = >>>>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>>>> HashMapStateBackend()); >>>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>> >>>>>>>>> >>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>> mainOperatorState = >>>>>>>>> savepoint.readKeyedState( >>>>>>>>> MAIN_OPERATOR, >>>>>>>>> new >>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>> >>>>>>>>> >>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>> /// extract what we need here >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> We tried two approaches: >>>>>>>>> - One is to read the savepoint with a rockDb backend. That works >>>>>>>>> and is low on memory usage, but is very very slow. We noticed the >>>>>>>>> iterator >>>>>>>>> is available very early on, but it is slow... >>>>>>>>> - The other is to read the savepoint with a HashMap backend. That >>>>>>>>> is very fast, as expected. However the iterator apparently only >>>>>>>>> returns >>>>>>>>> once the whole savepoint has been loaded in the HashMap, so heavy >>>>>>>>> memory >>>>>>>>> consumption. >>>>>>>>> >>>>>>>>> Is there a better way to do that? or a way to tune it so that it >>>>>>>>> does not consume all the memory ? or maybe reading it in parts... >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> JM >>>>>>>>> >>>>>>>>