I am still hoping that I am still good. I just read the savepoint to
extract information (parallelism 1, and only 1 task manager) . I also know
it has been created by a job using a HashMap backend. And I do not care
about duplicates.

I should still be good, right? from what I saw I never read any duplicate
keys.

Thanks

JM


On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Guys,
>
> We've just had an in-depth analysis and we think that removing that
> particular line causes correctness issues under some circumstances.
>
> Namely key duplicates can happen when multiple column families are
> processed at the same time. Not need to mention that it would cause multiple
> `readKey` calls which ends up in just wrong calculation logic (for example
> in simple sum calculation).
>
> We've a vision how this can be solved in a clean way but it will take some
> time.
>
> > Are there any plans on a migration guide or something for users to adapt
> their QS observers (beyond the current docs)?
>
> The gap between the two approaches are quite huge and considering the
> actual bugs and improvement possibilities in the state processor API
> I would say this can come later on at least on my plate. When you see the
> gaps and you know how to fill them feel free to contribute and
> we can shepherd the PRs.
>
> G
>
>
> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <salcantara...@gmail.com>
> wrote:
>
>> Thanks both for your work on this!
>>
>> On a related note, since Queryable State (QS) is going away soon,
>> streamlining the State Processor API as much as possible makes a lot of
>> sense.
>>
>> Are there any plans on a migration guide or something for users to adapt
>> their QS observers (beyond the current docs)? (State-)Observability-wise
>> Flink has some room for improvement I would say.
>>
>> Regards,
>>
>> Salva
>>
>>
>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Hi Jean-Marc,
>>>
>>> Thanks for your time investment and to share the numbers, it's super
>>> helpful.
>>> Ping me any time when you have further info to share.
>>>
>>> About the numbers: 48 minutes for 6Gb is not good but not terrible.
>>> I've seen petabyte scale states so I'm pretty sure we need to go
>>> beyond...
>>>
>>> Since we measure similar numbers with the unpatched Flink plus this has
>>> been reported this by several users,
>>> we must make changes in this area. It's still a question whether the
>>> tested patch is the right approach
>>> but at least we've touched the root cause.
>>>
>>> The next step on my side is to have a deep dive and understand all the
>>> aspects why remove is there,
>>> how the remove elimination would effect existing use-cases and consider
>>> all other possibilities.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>>>
>>>> Hi Gabor,
>>>>
>>>> I finally got to run that change through. I have a 6Gb savepoint I read
>>>> and parse for reference.
>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>> - RockDb with the patch reads it in 48 minutes (and requires less than
>>>> 2Gb)
>>>> - RockDb without the patch wasn't even halfway through after 12
>>>> hours.... (I gave up)
>>>>
>>>> I don't think I have any duplicates because the application that
>>>> generates the savepoint is using HashMap, so my scenario may not be
>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46).
>>>>
>>>> That was run on a VM on my  laptop, so not exactly a
>>>> controlled environment. but I think it's conclusive enough. I will need to
>>>> run further tests but I think we will patch our Flink. using a system
>>>> property to configure it.
>>>>
>>>> Hope this help
>>>>
>>>> JM
>>>>
>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Just to give an update. I've applied the mentioned patch and the
>>>>> execution time drastically decreased (the gain is 98.9%):
>>>>>
>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader            
>>>>>        [] - Execution time: PT14.690426S
>>>>>
>>>>> I need to double check what that would mean to correctness and all
>>>>> other aspects.
>>>>>
>>>>> G
>>>>>
>>>>>
>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> Please report back on how the patch behaves including any side
>>>>>> effects.
>>>>>>
>>>>>> Now I'm in testing the state reading with processor API vs the
>>>>>> mentioned job where we control the keys.
>>>>>> The difference is extreme, especially because the numbers are coming
>>>>>> from reading ~40Mb state file😅
>>>>>>
>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader           
>>>>>>         [] - Execution time: PT22M24.612954S
>>>>>> ...
>>>>>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob        
>>>>>>         [] - Execution time: PT6.930659S
>>>>>>
>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>> wrote:
>>>>>>
>>>>>>> That's a good idea, Sadly I have no control over the keys....
>>>>>>>
>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109
>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to see
>>>>>>> how that goes. If that brings RockDb performance in an acceptable range 
>>>>>>> for
>>>>>>> us we might go that way. I really like the light memory consumption of
>>>>>>> RockDb for that kind of side job.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> JM
>>>>>>>
>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>> set the operator UID on a custom written operator, which opens the
>>>>>>>> state info for you.
>>>>>>>>
>>>>>>>> The only drawback is that you must know the keyBy range... this can
>>>>>>>> be problematic but if you can do it it's a win :)
>>>>>>>>
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Gabor,
>>>>>>>>>
>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in
>>>>>>>>> pages, instead of as a single blob up front which I think is what the
>>>>>>>>> hashmap does... we just want to be called for each entry and extract 
>>>>>>>>> the
>>>>>>>>> bit we want in that scenario.
>>>>>>>>>
>>>>>>>>> Never mind
>>>>>>>>>
>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for nothing.
>>>>>>>>>
>>>>>>>>> JM
>>>>>>>>>
>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>
>>>>>>>>>> We've already realized that the RocksDB approach is not reaching
>>>>>>>>>> the performance criteria which it should be. There is an open issue 
>>>>>>>>>> for it
>>>>>>>>>> [1].
>>>>>>>>>> The hashmap based approach was and is always expecting more
>>>>>>>>>> memory. So if the memory footprint is a hard requirement then 
>>>>>>>>>> RocksDB is
>>>>>>>>>> the only way now.
>>>>>>>>>>
>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it better.
>>>>>>>>>> All I can promise that I'm now measuring performance of the RocksDB 
>>>>>>>>>> approach
>>>>>>>>>> and intended to eliminate the slowness. Since we don't know what
>>>>>>>>>> exactly causes the slowness the new Frocksdb-8.10.0 can be also an
>>>>>>>>>> imrpvement.
>>>>>>>>>>
>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>
>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>
>>>>>>>>>> BR,
>>>>>>>>>> G
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> What would be the best approach to read a savepoint and minimise
>>>>>>>>>>> the memory consumption. We just need to transform it into something 
>>>>>>>>>>> else
>>>>>>>>>>> for investigation.
>>>>>>>>>>>
>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and is
>>>>>>>>>>> spread over 6 task slots in 6 pods (under k8s). Savepoints are 
>>>>>>>>>>> saved on S3.
>>>>>>>>>>> A savepoint can be 4-5Gb or more.
>>>>>>>>>>>
>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>
>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>
>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>> HashMapStateBackend());
>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>             new
>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That
>>>>>>>>>>> works and is low on memory usage, but is very very slow. We noticed 
>>>>>>>>>>> the
>>>>>>>>>>> iterator is available very early on, but it is slow...
>>>>>>>>>>> - The other is to read the savepoint with a HashMap backend.
>>>>>>>>>>> That is very fast, as expected. However the iterator apparently only
>>>>>>>>>>> returns once the whole savepoint has been loaded in the HashMap, so 
>>>>>>>>>>> heavy
>>>>>>>>>>> memory consumption.
>>>>>>>>>>>
>>>>>>>>>>> Is there a better way to do that? or a way to tune it so that it
>>>>>>>>>>> does not consume all the memory ? or maybe reading it in parts...
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> JM
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to