In short, when you don't care about
multiple KeyedStateReaderFunction.readKey calls then you're on the safe
side.

G

On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:

> I am still hoping that I am still good. I just read the savepoint to
> extract information (parallelism 1, and only 1 task manager) . I also know
> it has been created by a job using a HashMap backend. And I do not care
> about duplicates.
>
> I should still be good, right? from what I saw I never read any duplicate
> keys.
>
> Thanks
>
> JM
>
>
> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> We've just had an in-depth analysis and we think that removing that
>> particular line causes correctness issues under some circumstances.
>>
>> Namely key duplicates can happen when multiple column families are
>> processed at the same time. Not need to mention that it would cause multiple
>> `readKey` calls which ends up in just wrong calculation logic (for
>> example in simple sum calculation).
>>
>> We've a vision how this can be solved in a clean way but it will take
>> some time.
>>
>> > Are there any plans on a migration guide or something for users to
>> adapt their QS observers (beyond the current docs)?
>>
>> The gap between the two approaches are quite huge and considering the
>> actual bugs and improvement possibilities in the state processor API
>> I would say this can come later on at least on my plate. When you see the
>> gaps and you know how to fill them feel free to contribute and
>> we can shepherd the PRs.
>>
>> G
>>
>>
>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> Thanks both for your work on this!
>>>
>>> On a related note, since Queryable State (QS) is going away soon,
>>> streamlining the State Processor API as much as possible makes a lot of
>>> sense.
>>>
>>> Are there any plans on a migration guide or something for users to adapt
>>> their QS observers (beyond the current docs)? (State-)Observability-wise
>>> Flink has some room for improvement I would say.
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>>
>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jean-Marc,
>>>>
>>>> Thanks for your time investment and to share the numbers, it's super
>>>> helpful.
>>>> Ping me any time when you have further info to share.
>>>>
>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible.
>>>> I've seen petabyte scale states so I'm pretty sure we need to go
>>>> beyond...
>>>>
>>>> Since we measure similar numbers with the unpatched Flink plus this has
>>>> been reported this by several users,
>>>> we must make changes in this area. It's still a question whether the
>>>> tested patch is the right approach
>>>> but at least we've touched the root cause.
>>>>
>>>> The next step on my side is to have a deep dive and understand all the
>>>> aspects why remove is there,
>>>> how the remove elimination would effect existing use-cases and consider
>>>> all other possibilities.
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk>
>>>> wrote:
>>>>
>>>>> Hi Gabor,
>>>>>
>>>>> I finally got to run that change through. I have a 6Gb savepoint I
>>>>> read and parse for reference.
>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>>> - RockDb with the patch reads it in 48 minutes (and requires less than
>>>>> 2Gb)
>>>>> - RockDb without the patch wasn't even halfway through after 12
>>>>> hours.... (I gave up)
>>>>>
>>>>> I don't think I have any duplicates because the application that
>>>>> generates the savepoint is using HashMap, so my scenario may not be
>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46).
>>>>>
>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>> controlled environment. but I think it's conclusive enough. I will need to
>>>>> run further tests but I think we will patch our Flink. using a system
>>>>> property to configure it.
>>>>>
>>>>> Hope this help
>>>>>
>>>>> JM
>>>>>
>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> Just to give an update. I've applied the mentioned patch and the
>>>>>> execution time drastically decreased (the gain is 98.9%):
>>>>>>
>>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader           
>>>>>>         [] - Execution time: PT14.690426S
>>>>>>
>>>>>> I need to double check what that would mean to correctness and all
>>>>>> other aspects.
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Please report back on how the patch behaves including any side
>>>>>>> effects.
>>>>>>>
>>>>>>> Now I'm in testing the state reading with processor API vs the
>>>>>>> mentioned job where we control the keys.
>>>>>>> The difference is extreme, especially because the numbers are coming
>>>>>>> from reading ~40Mb state file😅
>>>>>>>
>>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader          
>>>>>>>          [] - Execution time: PT22M24.612954S
>>>>>>> ...
>>>>>>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob       
>>>>>>>          [] - Execution time: PT6.930659S
>>>>>>>
>>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That's a good idea, Sadly I have no control over the keys....
>>>>>>>>
>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109
>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see
>>>>>>>> how that goes. If that brings RockDb performance in an acceptable 
>>>>>>>> range for
>>>>>>>> us we might go that way. I really like the light memory consumption of
>>>>>>>> RockDb for that kind of side job.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> JM
>>>>>>>>
>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>> set the operator UID on a custom written operator, which opens the
>>>>>>>>> state info for you.
>>>>>>>>>
>>>>>>>>> The only drawback is that you must know the keyBy range... this
>>>>>>>>> can be problematic but if you can do it it's a win :)
>>>>>>>>>
>>>>>>>>> G
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Gabor,
>>>>>>>>>>
>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in
>>>>>>>>>> pages, instead of as a single blob up front which I think is what the
>>>>>>>>>> hashmap does... we just want to be called for each entry and extract 
>>>>>>>>>> the
>>>>>>>>>> bit we want in that scenario.
>>>>>>>>>>
>>>>>>>>>> Never mind
>>>>>>>>>>
>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>>>>>>>>>
>>>>>>>>>> JM
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>
>>>>>>>>>>> We've already realized that the RocksDB approach is not reaching
>>>>>>>>>>> the performance criteria which it should be. There is an open issue 
>>>>>>>>>>> for it
>>>>>>>>>>> [1].
>>>>>>>>>>> The hashmap based approach was and is always expecting more
>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then 
>>>>>>>>>>> RocksDB is
>>>>>>>>>>> the only way now.
>>>>>>>>>>>
>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it
>>>>>>>>>>> better. All I can promise that I'm now measuring performance of the 
>>>>>>>>>>> RocksDB
>>>>>>>>>>> approach
>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know what
>>>>>>>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an
>>>>>>>>>>> imrpvement.
>>>>>>>>>>>
>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>
>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>
>>>>>>>>>>> BR,
>>>>>>>>>>> G
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What would be the best approach to read a savepoint and
>>>>>>>>>>>> minimise the memory consumption. We just need to transform it into
>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>
>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is
>>>>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are 
>>>>>>>>>>>> saved on S3.
>>>>>>>>>>>> A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>
>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>
>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>
>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>> HashMapStateBackend());
>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>             new
>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That
>>>>>>>>>>>> works and is low on memory usage, but is very very slow. We 
>>>>>>>>>>>> noticed the
>>>>>>>>>>>> iterator is available very early on, but it is slow...
>>>>>>>>>>>> - The other is to read the savepoint with a HashMap backend.
>>>>>>>>>>>> That is very fast, as expected. However the iterator apparently 
>>>>>>>>>>>> only
>>>>>>>>>>>> returns once the whole savepoint has been loaded in the HashMap, 
>>>>>>>>>>>> so heavy
>>>>>>>>>>>> memory consumption.
>>>>>>>>>>>>
>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so that
>>>>>>>>>>>> it does not consume all the memory ? or maybe reading it in 
>>>>>>>>>>>> parts...
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> JM
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to