Yeah, I think this is more like the 1.x and 2.x incompatibility.
I've just opened the PR agains 1.20 which you can cherry-pick here [1].

[1] https://github.com/apache/flink/pull/26145

BR,
G


On Tue, Feb 11, 2025 at 4:19 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:

> Hi Gabor,
>
> So, a bit of progress,
>
> I managed to compile our stuff against your 2.1-SNAPSHOT (with a bit of
> chopping around deprecated/changed and removed APIs - that wasn't too bad),
> but that failed to read the state I was using before (that was generated
> with a Flink 1.20). This is the stack trace I get. I suspect this has more
> to do with state compatibility between 1.20 and 2.1... I was surprised the
> error is against a timer state. The end of the stack trace is below.
>
> I will try to apply your patch/change/PR to our 1.20 build, but it's not a
> simple git apply 😭😭😭.
>
> $ git apply --check ~/Downloads/26134.patch.txt
> error: patch failed:
> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226
> error:
> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
> patch does not apply
> error: patch failed:
> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24
> error:
> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:
> patch does not apply
> error:
> flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java:
> No such file or directory
> error:
> flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java:
> No such file or directory
> error: patch failed:
> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22
> error:
> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:
> patch does not apply
> error:
> flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java:
> No such file or directory
> error:
> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java:
> No such file or directory
> error:
> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java:
> No such file or directory
> error:
> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java:
> No such file or directory
>
>
> stack trace if relevant...
>
> java.util.concurrent.CompletionException: java.lang.RuntimeException:
> Failed to fetch next result
>     at
> com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88)
> ~[classes/:?]
>     at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> ~[?:?]
>     at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> ~[?:?]
>     at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> ~[?:?]
> Caused by: java.lang.RuntimeException: Failed to fetch next result
>     ... many more
> Caused by: java.lang.RuntimeException: Failed to fetch next result
>     ... many more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     ... 7 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>     ... many more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>     ... many more
> Caused by: java.io.IOException: Failed to restore timer state
>     at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
> Caused by: java.lang.reflect.InvocationTargetException
>     at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method) ~[?:?]
>     at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
> ~[?:?]
>     at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:?]
>     at
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
> ~[?:?]
>     at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
> ~[?:?]
>     at
> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
> ~[classes/:?]
>     at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
> Caused by: java.lang.RuntimeException: Error while getting state
>     at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
> ~[classes/:?]
>     at
> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
> ~[classes/:?]
>     at
> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
> ~[classes/:?]
>     at
> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
> ~[classes/:?]
>     at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method) ~[?:?]
>     at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
> ~[?:?]
>     at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:?]
>     at
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
> ~[?:?]
>     at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
> ~[?:?]
>     at
> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
> ~[classes/:?]
>     at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a) must
> not be incompatible with the old state serializer
> (org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44).
>     at
> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858)
> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763)
> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020)
> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007)
> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
> ~[classes/:?]
>     at
> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
> ~[classes/:?]
>     at
> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
> ~[classes/:?]
>     at
> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
> ~[classes/:?]
>     at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method) ~[?:?]
>     at
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
> ~[?:?]
>     at
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> ~[?:?]
>     at
> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
> ~[?:?]
>     at
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
> ~[?:?]
>     at
> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
> ~[classes/:?]
>     at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>     at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>
>
>
> On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>
>> Hi Gabor,
>>
>> Trying to but I struggle to compile my stuff against your Flink build...
>> tried to apply your PR as a patch on my 1.20 modified fork and that didn't
>> go well either. It will take time to untangle.
>>
>> Will keep you updated if I make progress
>>
>> JM
>>
>>
>>
>> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Hi Jean-Marc,
>>>
>>> FYI, I've just opened this [1] PR to address the issue in a clean way.
>>> May I ask you to test it on your side?
>>>
>>> [1] https://github.com/apache/flink/pull/26134
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Just a little update on this. We've made our first POC with the
>>>> redesigned approach and the numbers are promising :)
>>>> It still requires huge efforts in development/correctness/performance
>>>> perspective but seems like we have something in the pocket.
>>>>
>>>> Test data: 256Mb state file with a single operator and 2 value states
>>>> - Old execution time: 25M27.126737S
>>>> - New execution time: 1M19.602042S
>>>> In short: ~95% performance gain.
>>>>
>>>> G
>>>>
>>>>
>>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>> wrote:
>>>>
>>>>> In short, when you don't care about
>>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the safe
>>>>> side.
>>>>>
>>>>> G
>>>>>
>>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>> wrote:
>>>>>
>>>>>> I am still hoping that I am still good. I just read the savepoint to
>>>>>> extract information (parallelism 1, and only 1 task manager) . I also 
>>>>>> know
>>>>>> it has been created by a job using a HashMap backend. And I do not care
>>>>>> about duplicates.
>>>>>>
>>>>>> I should still be good, right? from what I saw I never read any
>>>>>> duplicate keys.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> JM
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Guys,
>>>>>>>
>>>>>>> We've just had an in-depth analysis and we think that removing that
>>>>>>> particular line causes correctness issues under some circumstances.
>>>>>>>
>>>>>>> Namely key duplicates can happen when multiple column families are
>>>>>>> processed at the same time. Not need to mention that it would cause 
>>>>>>> multiple
>>>>>>> `readKey` calls which ends up in just wrong calculation logic (for
>>>>>>> example in simple sum calculation).
>>>>>>>
>>>>>>> We've a vision how this can be solved in a clean way but it will
>>>>>>> take some time.
>>>>>>>
>>>>>>> > Are there any plans on a migration guide or something for users to
>>>>>>> adapt their QS observers (beyond the current docs)?
>>>>>>>
>>>>>>> The gap between the two approaches are quite huge and considering
>>>>>>> the actual bugs and improvement possibilities in the state processor API
>>>>>>> I would say this can come later on at least on my plate. When you
>>>>>>> see the gaps and you know how to fill them feel free to contribute and
>>>>>>> we can shepherd the PRs.
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <
>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks both for your work on this!
>>>>>>>>
>>>>>>>> On a related note, since Queryable State (QS) is going away soon,
>>>>>>>> streamlining the State Processor API as much as possible makes a lot of
>>>>>>>> sense.
>>>>>>>>
>>>>>>>> Are there any plans on a migration guide or something for users to
>>>>>>>> adapt their QS observers (beyond the current docs)?
>>>>>>>> (State-)Observability-wise Flink has some room for improvement I would 
>>>>>>>> say.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Salva
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <
>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>
>>>>>>>>> Thanks for your time investment and to share the numbers, it's
>>>>>>>>> super helpful.
>>>>>>>>> Ping me any time when you have further info to share.
>>>>>>>>>
>>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible.
>>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need to go
>>>>>>>>> beyond...
>>>>>>>>>
>>>>>>>>> Since we measure similar numbers with the unpatched Flink plus
>>>>>>>>> this has been reported this by several users,
>>>>>>>>> we must make changes in this area. It's still a question whether
>>>>>>>>> the tested patch is the right approach
>>>>>>>>> but at least we've touched the root cause.
>>>>>>>>>
>>>>>>>>> The next step on my side is to have a deep dive and understand all
>>>>>>>>> the aspects why remove is there,
>>>>>>>>> how the remove elimination would effect existing use-cases and
>>>>>>>>> consider all other possibilities.
>>>>>>>>>
>>>>>>>>> BR,
>>>>>>>>> G
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Gabor,
>>>>>>>>>>
>>>>>>>>>> I finally got to run that change through. I have a 6Gb savepoint
>>>>>>>>>> I read and parse for reference.
>>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires less
>>>>>>>>>> than 2Gb)
>>>>>>>>>> - RockDb without the patch wasn't even halfway through after 12
>>>>>>>>>> hours.... (I gave up)
>>>>>>>>>>
>>>>>>>>>> I don't think I have any duplicates because the application that
>>>>>>>>>> generates the savepoint is using HashMap, so my scenario may not be
>>>>>>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46).
>>>>>>>>>>
>>>>>>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>>>>>>> controlled environment. but I think it's conclusive enough. I will 
>>>>>>>>>> need to
>>>>>>>>>> run further tests but I think we will patch our Flink. using a system
>>>>>>>>>> property to configure it.
>>>>>>>>>>
>>>>>>>>>> Hope this help
>>>>>>>>>>
>>>>>>>>>> JM
>>>>>>>>>>
>>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just to give an update. I've applied the mentioned patch and the
>>>>>>>>>>> execution time drastically decreased (the gain is 98.9%):
>>>>>>>>>>>
>>>>>>>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader      
>>>>>>>>>>>              [] - Execution time: PT14.690426S
>>>>>>>>>>>
>>>>>>>>>>> I need to double check what that would mean to correctness and
>>>>>>>>>>> all other aspects.
>>>>>>>>>>>
>>>>>>>>>>> G
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Please report back on how the patch behaves including any side
>>>>>>>>>>>> effects.
>>>>>>>>>>>>
>>>>>>>>>>>> Now I'm in testing the state reading with processor API vs the
>>>>>>>>>>>> mentioned job where we control the keys.
>>>>>>>>>>>> The difference is extreme, especially because the numbers are
>>>>>>>>>>>> coming from reading ~40Mb state file😅
>>>>>>>>>>>>
>>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader     
>>>>>>>>>>>>               [] - Execution time: PT22M24.612954S
>>>>>>>>>>>> ...
>>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO  o.a.f.e.s.r.FlinkTestStateReaderJob  
>>>>>>>>>>>>               [] - Execution time: PT6.930659S
>>>>>>>>>>>>
>>>>>>>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>>>>>>>
>>>>>>>>>>>> G
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <
>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> That's a good idea, Sadly I have no control over the keys....
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109
>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to
>>>>>>>>>>>>> see how that goes. If that brings RockDb performance in an 
>>>>>>>>>>>>> acceptable range
>>>>>>>>>>>>> for us we might go that way. I really like the light memory 
>>>>>>>>>>>>> consumption of
>>>>>>>>>>>>> RockDb for that kind of side job.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>> JM
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>>>>>>> set the operator UID on a custom written operator, which
>>>>>>>>>>>>>> opens the state info for you.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The only drawback is that you must know the keyBy range...
>>>>>>>>>>>>>> this can be problematic but if you can do it it's a win :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <
>>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint
>>>>>>>>>>>>>>> in pages, instead of as a single blob up front which I think is 
>>>>>>>>>>>>>>> what the
>>>>>>>>>>>>>>> hashmap does... we just want to be called for each entry and 
>>>>>>>>>>>>>>> extract the
>>>>>>>>>>>>>>> bit we want in that scenario.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Never mind
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for
>>>>>>>>>>>>>>> nothing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is not
>>>>>>>>>>>>>>>> reaching the performance criteria which it should be. There is 
>>>>>>>>>>>>>>>> an open
>>>>>>>>>>>>>>>> issue for it [1].
>>>>>>>>>>>>>>>> The hashmap based approach was and is always expecting more
>>>>>>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then 
>>>>>>>>>>>>>>>> RocksDB is
>>>>>>>>>>>>>>>> the only way now.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it
>>>>>>>>>>>>>>>> better. All I can promise that I'm now measuring performance 
>>>>>>>>>>>>>>>> of the RocksDB
>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know
>>>>>>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can 
>>>>>>>>>>>>>>>> be also an
>>>>>>>>>>>>>>>> imrpvement.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> BR,
>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint and
>>>>>>>>>>>>>>>>> minimise the memory consumption. We just need to transform it 
>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and
>>>>>>>>>>>>>>>>> is spread over 6 task slots in 6 pods (under k8s). Savepoints 
>>>>>>>>>>>>>>>>> are saved on
>>>>>>>>>>>>>>>>> S3. A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>>>>>>         SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>>>>> HashMapStateBackend());
>>>>>>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend.
>>>>>>>>>>>>>>>>> That works and is low on memory usage, but is very very slow. 
>>>>>>>>>>>>>>>>> We noticed
>>>>>>>>>>>>>>>>> the iterator is available very early on, but it is slow...
>>>>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap
>>>>>>>>>>>>>>>>> backend. That is very fast, as expected. However the iterator 
>>>>>>>>>>>>>>>>> apparently
>>>>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded in the 
>>>>>>>>>>>>>>>>> HashMap, so
>>>>>>>>>>>>>>>>> heavy memory consumption.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so
>>>>>>>>>>>>>>>>> that it does not consume all the memory ? or maybe reading it 
>>>>>>>>>>>>>>>>> in parts...
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>

Reply via email to