Thanks both for your work on this!

On a related note, since Queryable State (QS) is going away soon,
streamlining the State Processor API as much as possible makes a lot of
sense.

Are there any plans on a migration guide or something for users to adapt
their QS observers (beyond the current docs)? (State-)Observability-wise
Flink has some room for improvement I would say.

Regards,

Salva


On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Jean-Marc,
>
> Thanks for your time investment and to share the numbers, it's super
> helpful.
> Ping me any time when you have further info to share.
>
> About the numbers: 48 minutes for 6Gb is not good but not terrible.
> I've seen petabyte scale states so I'm pretty sure we need to go beyond...
>
> Since we measure similar numbers with the unpatched Flink plus this has
> been reported this by several users,
> we must make changes in this area. It's still a question whether the
> tested patch is the right approach
> but at least we've touched the root cause.
>
> The next step on my side is to have a deep dive and understand all the
> aspects why remove is there,
> how the remove elimination would effect existing use-cases and consider
> all other possibilities.
>
> BR,
> G
>
>
> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>
>> Hi Gabor,
>>
>> I finally got to run that change through. I have a 6Gb savepoint I read
>> and parse for reference.
>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>> - RockDb with the patch reads it in 48 minutes (and requires less than
>> 2Gb)
>> - RockDb without the patch wasn't even halfway through after 12 hours....
>> (I gave up)
>>
>> I don't think I have any duplicates because the application that
>> generates the savepoint is using HashMap, so my scenario may not be
>> representative. I am using IBM Seremu Java 17 (openJ9-0.46).
>>
>> That was run on a VM on my  laptop, so not exactly a
>> controlled environment. but I think it's conclusive enough. I will need to
>> run further tests but I think we will patch our Flink. using a system
>> property to configure it.
>>
>> Hope this help
>>
>> JM
>>
>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Just to give an update. I've applied the mentioned patch and the
>>> execution time drastically decreased (the gain is 98.9%):
>>>
>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader              
>>>      [] - Execution time: PT14.690426S
>>>
>>> I need to double check what that would mean to correctness and all other
>>> aspects.
>>>
>>> G
>>>
>>>
>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Please report back on how the patch behaves including any side effects.
>>>>
>>>> Now I'm in testing the state reading with processor API vs the
>>>> mentioned job where we control the keys.
>>>> The difference is extreme, especially because the numbers are coming
>>>> from reading ~40Mb state file😅
>>>>
>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader             
>>>>       [] - Execution time: PT22M24.612954S
>>>> ...
>>>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob          
>>>>       [] - Execution time: PT6.930659S
>>>>
>>>> Don't need to mention that the bigger is the processor API.
>>>>
>>>> G
>>>>
>>>>
>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>> wrote:
>>>>
>>>>> That's a good idea, Sadly I have no control over the keys....
>>>>>
>>>>> I was going to patch Flink with the suggestion in FLINK-37109
>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see how
>>>>> that goes. If that brings RockDb performance in an acceptable range for us
>>>>> we might go that way. I really like the light memory consumption of RockDb
>>>>> for that kind of side job.
>>>>>
>>>>> Thanks
>>>>>
>>>>> JM
>>>>>
>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> What I could imagine is to create a normal Flink job,
>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>> set the operator UID on a custom written operator, which opens the
>>>>>> state info for you.
>>>>>>
>>>>>> The only drawback is that you must know the keyBy range... this can
>>>>>> be problematic but if you can do it it's a win :)
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Gabor,
>>>>>>>
>>>>>>> I thought so. I was hoping for a way to read the savepoint in pages,
>>>>>>> instead of as a single blob up front which I think is what the hashmap
>>>>>>> does... we just want to be called for each entry and extract the bit we
>>>>>>> want in that scenario.
>>>>>>>
>>>>>>> Never mind
>>>>>>>
>>>>>>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>>>>>>
>>>>>>> JM
>>>>>>>
>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Jean-Marc,
>>>>>>>>
>>>>>>>> We've already realized that the RocksDB approach is not reaching
>>>>>>>> the performance criteria which it should be. There is an open issue 
>>>>>>>> for it
>>>>>>>> [1].
>>>>>>>> The hashmap based approach was and is always expecting more memory.
>>>>>>>> So if the memory footprint is a hard requirement then RocksDB is the 
>>>>>>>> only
>>>>>>>> way now.
>>>>>>>>
>>>>>>>> Bad to say but I can't suggest any nifty trick to make it better.
>>>>>>>> All I can promise that I'm now measuring performance of the RocksDB 
>>>>>>>> approach
>>>>>>>> and intended to eliminate the slowness. Since we don't know what
>>>>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an
>>>>>>>> imrpvement.
>>>>>>>>
>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>
>>>>>>>> BR,
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> What would be the best approach to read a savepoint and minimise
>>>>>>>>> the memory consumption. We just need to transform it into something 
>>>>>>>>> else
>>>>>>>>> for investigation.
>>>>>>>>>
>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is
>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are saved 
>>>>>>>>> on S3.
>>>>>>>>> A savepoint can be 4-5Gb or more.
>>>>>>>>>
>>>>>>>>> The reader is more basic, using a Local Execution EnvironmentThis
>>>>>>>>> is essentially what we are doing:
>>>>>>>>>
>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>
>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>>>> HashMapStateBackend());
>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>> mainOperatorState =
>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>             new
>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>         /// extract what we need here
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We tried two approaches:
>>>>>>>>> - One is to read the savepoint with a rockDb backend. That works
>>>>>>>>> and is low on memory usage, but is very very slow. We noticed the 
>>>>>>>>> iterator
>>>>>>>>> is available very early on, but it is slow...
>>>>>>>>> - The other is to read the savepoint with a HashMap backend. That
>>>>>>>>> is very fast, as expected. However the iterator apparently only 
>>>>>>>>> returns
>>>>>>>>> once the whole savepoint has been loaded in the HashMap, so heavy 
>>>>>>>>> memory
>>>>>>>>> consumption.
>>>>>>>>>
>>>>>>>>> Is there a better way to do that? or a way to tune it so that it
>>>>>>>>> does not consume all the memory ? or maybe reading it in parts...
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> JM
>>>>>>>>>
>>>>>>>>

Reply via email to