Hi Gabor,

So, a bit of progress,

I managed to compile our stuff against your 2.1-SNAPSHOT (with a bit of
chopping around deprecated/changed and removed APIs - that wasn't too bad),
but that failed to read the state I was using before (that was generated
with a Flink 1.20). This is the stack trace I get. I suspect this has more
to do with state compatibility between 1.20 and 2.1... I was surprised the
error is against a timer state. The end of the stack trace is below.

I will try to apply your patch/change/PR to our 1.20 build, but it's not a
simple git apply 😭😭😭.

$ git apply --check ~/Downloads/26134.patch.txt
error: patch failed:
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226
error:
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
patch does not apply
error: patch failed:
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24
error:
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:
patch does not apply
error:
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java:
No such file or directory
error:
flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java:
No such file or directory
error: patch failed:
flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22
error:
flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:
patch does not apply
error:
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java:
No such file or directory
error:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java:
No such file or directory
error:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java:
No such file or directory
error:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java:
No such file or directory


stack trace if relevant...

java.util.concurrent.CompletionException: java.lang.RuntimeException:
Failed to fetch next result
    at
com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88)
~[classes/:?]
    at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
~[?:?]
    at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
~[?:?]
    at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
~[?:?]
    at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
~[?:?]
    at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
~[?:?]
    at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
~[?:?]
    at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
~[?:?]
Caused by: java.lang.RuntimeException: Failed to fetch next result
    ... many more
Caused by: java.lang.RuntimeException: Failed to fetch next result
    ... many more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    ... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
    ... many more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
    ... many more
Caused by: java.io.IOException: Failed to restore timer state
    at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
Caused by: java.lang.reflect.InvocationTargetException
    at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) ~[?:?]
    at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
~[?:?]
    at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:?]
    at
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
~[?:?]
    at
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
~[?:?]
    at
com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
~[classes/:?]
    at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
Caused by: java.lang.RuntimeException: Error while getting state
    at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
~[classes/:?]
    at
com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
~[classes/:?]
    at
com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
~[classes/:?]
    at
com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
~[classes/:?]
    at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) ~[?:?]
    at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
~[?:?]
    at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:?]
    at
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
~[?:?]
    at
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
~[?:?]
    at
com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
~[classes/:?]
    at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a) must
not be incompatible with the old state serializer
(org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44).
    at
org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858)
~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763)
~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020)
~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007)
~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
~[classes/:?]
    at
com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
~[classes/:?]
    at
com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
~[classes/:?]
    at
com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
~[classes/:?]
    at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method) ~[?:?]
    at
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
~[?:?]
    at
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
~[?:?]
    at
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
~[?:?]
    at
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
~[?:?]
    at
com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
~[classes/:?]
    at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
    at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]



On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:

> Hi Gabor,
>
> Trying to but I struggle to compile my stuff against your Flink build...
> tried to apply your PR as a patch on my 1.20 modified fork and that didn't
> go well either. It will take time to untangle.
>
> Will keep you updated if I make progress
>
> JM
>
>
>
> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Hi Jean-Marc,
>>
>> FYI, I've just opened this [1] PR to address the issue in a clean way.
>> May I ask you to test it on your side?
>>
>> [1] https://github.com/apache/flink/pull/26134
>>
>> BR,
>> G
>>
>>
>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Just a little update on this. We've made our first POC with the
>>> redesigned approach and the numbers are promising :)
>>> It still requires huge efforts in development/correctness/performance
>>> perspective but seems like we have something in the pocket.
>>>
>>> Test data: 256Mb state file with a single operator and 2 value states
>>> - Old execution time: 25M27.126737S
>>> - New execution time: 1M19.602042S
>>> In short: ~95% performance gain.
>>>
>>> G
>>>
>>>
>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> In short, when you don't care about
>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the safe
>>>> side.
>>>>
>>>> G
>>>>
>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>> wrote:
>>>>
>>>>> I am still hoping that I am still good. I just read the savepoint to
>>>>> extract information (parallelism 1, and only 1 task manager) . I also know
>>>>> it has been created by a job using a HashMap backend. And I do not care
>>>>> about duplicates.
>>>>>
>>>>> I should still be good, right? from what I saw I never read any
>>>>> duplicate keys.
>>>>>
>>>>> Thanks
>>>>>
>>>>> JM
>>>>>
>>>>>
>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> Hi Guys,
>>>>>>
>>>>>> We've just had an in-depth analysis and we think that removing that
>>>>>> particular line causes correctness issues under some circumstances.
>>>>>>
>>>>>> Namely key duplicates can happen when multiple column families are
>>>>>> processed at the same time. Not need to mention that it would cause 
>>>>>> multiple
>>>>>> `readKey` calls which ends up in just wrong calculation logic (for
>>>>>> example in simple sum calculation).
>>>>>>
>>>>>> We've a vision how this can be solved in a clean way but it will take
>>>>>> some time.
>>>>>>
>>>>>> > Are there any plans on a migration guide or something for users to
>>>>>> adapt their QS observers (beyond the current docs)?
>>>>>>
>>>>>> The gap between the two approaches are quite huge and considering the
>>>>>> actual bugs and improvement possibilities in the state processor API
>>>>>> I would say this can come later on at least on my plate. When you see
>>>>>> the gaps and you know how to fill them feel free to contribute and
>>>>>> we can shepherd the PRs.
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <
>>>>>> salcantara...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks both for your work on this!
>>>>>>>
>>>>>>> On a related note, since Queryable State (QS) is going away soon,
>>>>>>> streamlining the State Processor API as much as possible makes a lot of
>>>>>>> sense.
>>>>>>>
>>>>>>> Are there any plans on a migration guide or something for users to
>>>>>>> adapt their QS observers (beyond the current docs)?
>>>>>>> (State-)Observability-wise Flink has some room for improvement I would 
>>>>>>> say.
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Salva
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Jean-Marc,
>>>>>>>>
>>>>>>>> Thanks for your time investment and to share the numbers, it's
>>>>>>>> super helpful.
>>>>>>>> Ping me any time when you have further info to share.
>>>>>>>>
>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible.
>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need to go
>>>>>>>> beyond...
>>>>>>>>
>>>>>>>> Since we measure similar numbers with the unpatched Flink plus this
>>>>>>>> has been reported this by several users,
>>>>>>>> we must make changes in this area. It's still a question whether
>>>>>>>> the tested patch is the right approach
>>>>>>>> but at least we've touched the root cause.
>>>>>>>>
>>>>>>>> The next step on my side is to have a deep dive and understand all
>>>>>>>> the aspects why remove is there,
>>>>>>>> how the remove elimination would effect existing use-cases and
>>>>>>>> consider all other possibilities.
>>>>>>>>
>>>>>>>> BR,
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Gabor,
>>>>>>>>>
>>>>>>>>> I finally got to run that change through. I have a 6Gb savepoint I
>>>>>>>>> read and parse for reference.
>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires less
>>>>>>>>> than 2Gb)
>>>>>>>>> - RockDb without the patch wasn't even halfway through after 12
>>>>>>>>> hours.... (I gave up)
>>>>>>>>>
>>>>>>>>> I don't think I have any duplicates because the application that
>>>>>>>>> generates the savepoint is using HashMap, so my scenario may not be
>>>>>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46).
>>>>>>>>>
>>>>>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>>>>>> controlled environment. but I think it's conclusive enough. I will 
>>>>>>>>> need to
>>>>>>>>> run further tests but I think we will patch our Flink. using a system
>>>>>>>>> property to configure it.
>>>>>>>>>
>>>>>>>>> Hope this help
>>>>>>>>>
>>>>>>>>> JM
>>>>>>>>>
>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Just to give an update. I've applied the mentioned patch and the
>>>>>>>>>> execution time drastically decreased (the gain is 98.9%):
>>>>>>>>>>
>>>>>>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader       
>>>>>>>>>>             [] - Execution time: PT14.690426S
>>>>>>>>>>
>>>>>>>>>> I need to double check what that would mean to correctness and
>>>>>>>>>> all other aspects.
>>>>>>>>>>
>>>>>>>>>> G
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Please report back on how the patch behaves including any side
>>>>>>>>>>> effects.
>>>>>>>>>>>
>>>>>>>>>>> Now I'm in testing the state reading with processor API vs the
>>>>>>>>>>> mentioned job where we control the keys.
>>>>>>>>>>> The difference is extreme, especially because the numbers are
>>>>>>>>>>> coming from reading ~40Mb state file😅
>>>>>>>>>>>
>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader      
>>>>>>>>>>>              [] - Execution time: PT22M24.612954S
>>>>>>>>>>> ...
>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob   
>>>>>>>>>>>              [] - Execution time: PT6.930659S
>>>>>>>>>>>
>>>>>>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>>>>>>
>>>>>>>>>>> G
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> That's a good idea, Sadly I have no control over the keys....
>>>>>>>>>>>>
>>>>>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109
>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to
>>>>>>>>>>>> see how that goes. If that brings RockDb performance in an 
>>>>>>>>>>>> acceptable range
>>>>>>>>>>>> for us we might go that way. I really like the light memory 
>>>>>>>>>>>> consumption of
>>>>>>>>>>>> RockDb for that kind of side job.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>> JM
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>>>>>> set the operator UID on a custom written operator, which opens
>>>>>>>>>>>>> the state info for you.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The only drawback is that you must know the keyBy range...
>>>>>>>>>>>>> this can be problematic but if you can do it it's a win :)
>>>>>>>>>>>>>
>>>>>>>>>>>>> G
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <
>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint in
>>>>>>>>>>>>>> pages, instead of as a single blob up front which I think is 
>>>>>>>>>>>>>> what the
>>>>>>>>>>>>>> hashmap does... we just want to be called for each entry and 
>>>>>>>>>>>>>> extract the
>>>>>>>>>>>>>> bit we want in that scenario.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Never mind
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for
>>>>>>>>>>>>>> nothing.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is not
>>>>>>>>>>>>>>> reaching the performance criteria which it should be. There is 
>>>>>>>>>>>>>>> an open
>>>>>>>>>>>>>>> issue for it [1].
>>>>>>>>>>>>>>> The hashmap based approach was and is always expecting more
>>>>>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then 
>>>>>>>>>>>>>>> RocksDB is
>>>>>>>>>>>>>>> the only way now.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it
>>>>>>>>>>>>>>> better. All I can promise that I'm now measuring performance of 
>>>>>>>>>>>>>>> the RocksDB
>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know
>>>>>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can be 
>>>>>>>>>>>>>>> also an
>>>>>>>>>>>>>>> imrpvement.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> BR,
>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint and
>>>>>>>>>>>>>>>> minimise the memory consumption. We just need to transform it 
>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and
>>>>>>>>>>>>>>>> is spread over 6 task slots in 6 pods (under k8s). Savepoints 
>>>>>>>>>>>>>>>> are saved on
>>>>>>>>>>>>>>>> S3. A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>>>> HashMapStateBackend());
>>>>>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. That
>>>>>>>>>>>>>>>> works and is low on memory usage, but is very very slow. We 
>>>>>>>>>>>>>>>> noticed the
>>>>>>>>>>>>>>>> iterator is available very early on, but it is slow...
>>>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap
>>>>>>>>>>>>>>>> backend. That is very fast, as expected. However the iterator 
>>>>>>>>>>>>>>>> apparently
>>>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded in the 
>>>>>>>>>>>>>>>> HashMap, so
>>>>>>>>>>>>>>>> heavy memory consumption.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so
>>>>>>>>>>>>>>>> that it does not consume all the memory ? or maybe reading it 
>>>>>>>>>>>>>>>> in parts...
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to