Hi Gabor,

Trying to but I struggle to compile my stuff against your Flink build...
tried to apply your PR as a patch on my 1.20 modified fork and that didn't
go well either. It will take time to untangle.

Will keep you updated if I make progress

JM



On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Jean-Marc,
>
> FYI, I've just opened this [1] PR to address the issue in a clean way.
> May I ask you to test it on your side?
>
> [1] https://github.com/apache/flink/pull/26134
>
> BR,
> G
>
>
> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Just a little update on this. We've made our first POC with the
>> redesigned approach and the numbers are promising :)
>> It still requires huge efforts in development/correctness/performance
>> perspective but seems like we have something in the pocket.
>>
>> Test data: 256Mb state file with a single operator and 2 value states
>> - Old execution time: 25M27.126737S
>> - New execution time: 1M19.602042S
>> In short: ~95% performance gain.
>>
>> G
>>
>>
>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> In short, when you don't care about
>>> multiple KeyedStateReaderFunction.readKey calls then you're on the safe
>>> side.
>>>
>>> G
>>>
>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>>>
>>>> I am still hoping that I am still good. I just read the savepoint to
>>>> extract information (parallelism 1, and only 1 task manager) . I also know
>>>> it has been created by a job using a HashMap backend. And I do not care
>>>> about duplicates.
>>>>
>>>> I should still be good, right? from what I saw I never read any
>>>> duplicate keys.
>>>>
>>>> Thanks
>>>>
>>>> JM
>>>>
>>>>
>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Guys,
>>>>>
>>>>> We've just had an in-depth analysis and we think that removing that
>>>>> particular line causes correctness issues under some circumstances.
>>>>>
>>>>> Namely key duplicates can happen when multiple column families are
>>>>> processed at the same time. Not need to mention that it would cause 
>>>>> multiple
>>>>> `readKey` calls which ends up in just wrong calculation logic (for
>>>>> example in simple sum calculation).
>>>>>
>>>>> We've a vision how this can be solved in a clean way but it will take
>>>>> some time.
>>>>>
>>>>> > Are there any plans on a migration guide or something for users to
>>>>> adapt their QS observers (beyond the current docs)?
>>>>>
>>>>> The gap between the two approaches are quite huge and considering the
>>>>> actual bugs and improvement possibilities in the state processor API
>>>>> I would say this can come later on at least on my plate. When you see
>>>>> the gaps and you know how to fill them feel free to contribute and
>>>>> we can shepherd the PRs.
>>>>>
>>>>> G
>>>>>
>>>>>
>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <
>>>>> salcantara...@gmail.com> wrote:
>>>>>
>>>>>> Thanks both for your work on this!
>>>>>>
>>>>>> On a related note, since Queryable State (QS) is going away soon,
>>>>>> streamlining the State Processor API as much as possible makes a lot of
>>>>>> sense.
>>>>>>
>>>>>> Are there any plans on a migration guide or something for users to
>>>>>> adapt their QS observers (beyond the current docs)?
>>>>>> (State-)Observability-wise Flink has some room for improvement I would 
>>>>>> say.
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Salva
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Jean-Marc,
>>>>>>>
>>>>>>> Thanks for your time investment and to share the numbers, it's super
>>>>>>> helpful.
>>>>>>> Ping me any time when you have further info to share.
>>>>>>>
>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible.
>>>>>>> I've seen petabyte scale states so I'm pretty sure we need to go
>>>>>>> beyond...
>>>>>>>
>>>>>>> Since we measure similar numbers with the unpatched Flink plus this
>>>>>>> has been reported this by several users,
>>>>>>> we must make changes in this area. It's still a question whether the
>>>>>>> tested patch is the right approach
>>>>>>> but at least we've touched the root cause.
>>>>>>>
>>>>>>> The next step on my side is to have a deep dive and understand all
>>>>>>> the aspects why remove is there,
>>>>>>> how the remove elimination would effect existing use-cases and
>>>>>>> consider all other possibilities.
>>>>>>>
>>>>>>> BR,
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Gabor,
>>>>>>>>
>>>>>>>> I finally got to run that change through. I have a 6Gb savepoint I
>>>>>>>> read and parse for reference.
>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires less
>>>>>>>> than 2Gb)
>>>>>>>> - RockDb without the patch wasn't even halfway through after 12
>>>>>>>> hours.... (I gave up)
>>>>>>>>
>>>>>>>> I don't think I have any duplicates because the application that
>>>>>>>> generates the savepoint is using HashMap, so my scenario may not be
>>>>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46).
>>>>>>>>
>>>>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>>>>> controlled environment. but I think it's conclusive enough. I will 
>>>>>>>> need to
>>>>>>>> run further tests but I think we will patch our Flink. using a system
>>>>>>>> property to configure it.
>>>>>>>>
>>>>>>>> Hope this help
>>>>>>>>
>>>>>>>> JM
>>>>>>>>
>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Just to give an update. I've applied the mentioned patch and the
>>>>>>>>> execution time drastically decreased (the gain is 98.9%):
>>>>>>>>>
>>>>>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader        
>>>>>>>>>            [] - Execution time: PT14.690426S
>>>>>>>>>
>>>>>>>>> I need to double check what that would mean to correctness and all
>>>>>>>>> other aspects.
>>>>>>>>>
>>>>>>>>> G
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Please report back on how the patch behaves including any side
>>>>>>>>>> effects.
>>>>>>>>>>
>>>>>>>>>> Now I'm in testing the state reading with processor API vs the
>>>>>>>>>> mentioned job where we control the keys.
>>>>>>>>>> The difference is extreme, especially because the numbers are
>>>>>>>>>> coming from reading ~40Mb state file😅
>>>>>>>>>>
>>>>>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader       
>>>>>>>>>>             [] - Execution time: PT22M24.612954S
>>>>>>>>>> ...
>>>>>>>>>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob    
>>>>>>>>>>             [] - Execution time: PT6.930659S
>>>>>>>>>>
>>>>>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>>>>>
>>>>>>>>>> G
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> That's a good idea, Sadly I have no control over the keys....
>>>>>>>>>>>
>>>>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109
>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to
>>>>>>>>>>> see how that goes. If that brings RockDb performance in an 
>>>>>>>>>>> acceptable range
>>>>>>>>>>> for us we might go that way. I really like the light memory 
>>>>>>>>>>> consumption of
>>>>>>>>>>> RockDb for that kind of side job.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> JM
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>>>>> set the operator UID on a custom written operator, which opens
>>>>>>>>>>>> the state info for you.
>>>>>>>>>>>>
>>>>>>>>>>>> The only drawback is that you must know the keyBy range... this
>>>>>>>>>>>> can be problematic but if you can do it it's a win :)
>>>>>>>>>>>>
>>>>>>>>>>>> G
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <
>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in
>>>>>>>>>>>>> pages, instead of as a single blob up front which I think is what 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> hashmap does... we just want to be called for each entry and 
>>>>>>>>>>>>> extract the
>>>>>>>>>>>>> bit we want in that scenario.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Never mind
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for
>>>>>>>>>>>>> nothing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> JM
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We've already realized that the RocksDB approach is not
>>>>>>>>>>>>>> reaching the performance criteria which it should be. There is 
>>>>>>>>>>>>>> an open
>>>>>>>>>>>>>> issue for it [1].
>>>>>>>>>>>>>> The hashmap based approach was and is always expecting more
>>>>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then 
>>>>>>>>>>>>>> RocksDB is
>>>>>>>>>>>>>> the only way now.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it
>>>>>>>>>>>>>> better. All I can promise that I'm now measuring performance of 
>>>>>>>>>>>>>> the RocksDB
>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know
>>>>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can be 
>>>>>>>>>>>>>> also an
>>>>>>>>>>>>>> imrpvement.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> BR,
>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What would be the best approach to read a savepoint and
>>>>>>>>>>>>>>> minimise the memory consumption. We just need to transform it 
>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and
>>>>>>>>>>>>>>> is spread over 6 task slots in 6 pods (under k8s). Savepoints 
>>>>>>>>>>>>>>> are saved on
>>>>>>>>>>>>>>> S3. A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>>> HashMapStateBackend());
>>>>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That
>>>>>>>>>>>>>>> works and is low on memory usage, but is very very slow. We 
>>>>>>>>>>>>>>> noticed the
>>>>>>>>>>>>>>> iterator is available very early on, but it is slow...
>>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap
>>>>>>>>>>>>>>> backend. That is very fast, as expected. However the iterator 
>>>>>>>>>>>>>>> apparently
>>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded in the 
>>>>>>>>>>>>>>> HashMap, so
>>>>>>>>>>>>>>> heavy memory consumption.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so
>>>>>>>>>>>>>>> that it does not consume all the memory ? or maybe reading it 
>>>>>>>>>>>>>>> in parts...
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to