Hi Gabor,

Thanks for the seedy turn around. I will rebase our flink to capture both
Flink 1.20.1 and your PR, which I think is the only one that didn't make
the cut.

As for the migration to 2.x, this is not something I gave much thought
about yet. We also have windows and TTL in the state. I did not check yet
if Flink has some APIs in place to read a 1.x state and write a 2.x one.
I also saw mentions of maybe upgrading kryo to a more recent version
in [FLINK-3154]
Update Kryo version from 2.24.0 to latest Kryo LTS version - ASF JIRA
<https://issues.apache.org/jira/browse/FLINK-3154>, but this is not in the
2.x codebase yet.

JM


On Fri, Feb 14, 2025 at 3:42 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Jean-Marc,
>
> FYI the changes are merged into master and release-1.20 so feel free to
> use it.
>
> In terms of state migration from 1.x to 2.x this can be challenging for
> the whole community so if you have some info please share.
>
> G
>
>
> On Wed, Feb 12, 2025 at 2:40 PM Jean-Marc Paulin <jm.pau...@gmail.com>
> wrote:
>
>> Hi Gabor
>>
>> Glad we helped, but thanks to you for working on this. The performance
>> improvement you made is fantastic.
>>
>> I think on our side we will wait for the final official patch, and then
>> decide if we move up to 1.20.1 (or whatever includes it) or just apply the
>> patch on what we have. Our Flink is heavily customized on top of 1.20 for
>> HA configuration and priority preferences in the TwoInputStreamOperator.
>>
>> As part of this we learnt we will have to find a good way to handle state
>> migration when moving up to Flink  2.x, so that was a good learning
>> exercise for us.
>>
>> Kind regards
>>
>> JM
>>
>>
>>
>> On Wed, Feb 12, 2025 at 12:57 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Hi Jean-Marc,
>>>
>>> Thanks for your efforts! We've done quite extensive tests inside and
>>> they are all passing. Good to see that the number of keys matches on your
>>> side too.
>>> Regarding key ordering, the state processor API is not giving any
>>> guarantees in terms of ordering.
>>>
>>> All in all I'm confident with the solution so green light from my side.
>>> I'm intended to open the PR for some time to have everybody the possibility
>>> to make it better or raise any kind of concerns.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Feb 12, 2025 at 1:21 PM Jean-Marc Paulin <jm.pau...@gmail.com>
>>> wrote:
>>>
>>>> Hi gabor
>>>>
>>>> I see all my 1,331,301 keys with your patch, which is exactly what I am
>>>> expecting. If you have more specific concerns I suppose I can instrument my
>>>> code further. I have no expectation regarding the key ordering.
>>>>
>>>> JM
>>>>
>>>>
>>>> On Wed, Feb 12, 2025 at 11:29 AM Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com> wrote:
>>>>
>>>>> I think the hashmap vs patched RocksDB makes sense, at least I'm
>>>>> measuring similar number with relatively small states.
>>>>> RockDb commenting out the remove() is a bit surprisingly high but
>>>>> since it's causing correctness issues under some circumstances I would
>>>>> abandon that.
>>>>>
>>>>> > I probably need more time to ensure we definitely see all the keys,
>>>>> but that looks very good.
>>>>>
>>>>> Yeah, correctness is key here so waiting on your results.
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Wed, Feb 12, 2025 at 11:56 AM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>> wrote:
>>>>>
>>>>>> Hi Gabor,
>>>>>>
>>>>>> I applied your 1.20 patch, and I got some very good numbers from
>>>>>> it... so for my 5GB savepoint, I made sure I skip all my code overhead to
>>>>>> get the raw number, and I can read it in
>>>>>> - HashMap : 4 minutes
>>>>>> - RockDb with your patch: ~19 minutes
>>>>>> - RockDb commenting out the remove(): 49 minutes
>>>>>>
>>>>>> I am not sure these numbers make sense, and this is on a VM on my
>>>>>> laptop, so not exactly a good performance testing environment, but the
>>>>>> numbers I see are pretty good. I probably need more time to ensure we
>>>>>> definitely see all the keys, but that looks very good.
>>>>>>
>>>>>> Hope this help
>>>>>>
>>>>>> JM
>>>>>>
>>>>>>
>>>>>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: ( run 1/2)
>>>>>> ```
>>>>>> 2025-02-12 08:22:59,282 INFO  LifecycleStateExplorerTest  - Starting
>>>>>> now
>>>>>> 2025-02-12 08:22:59,283 INFO  c.i.a.l.s.LifecycleStateExplorer  -
>>>>>> Reading state from /tmp/mock_savepoint16385495638880357651.
>>>>>> 2025-02-12 08:41:29,066 INFO  LifecycleStateExplorerTest  - Completed
>>>>>> now
>>>>>> 2025-02-12 08:41:29,066 INFO  LifecycleStateExplorerTest  - Duration:
>>>>>> PT18M29.783388324S ms
>>>>>> ```
>>>>>>
>>>>>> #### using our flink 1.20 build, that removes the
>>>>>> keysAndNamespaces.remove(); line
>>>>>> ```
>>>>>> 2025-02-12 09:03:39,018 INFO  LifecycleStateExplorerTest  - Starting
>>>>>> now
>>>>>> 2025-02-12 09:03:39,024 INFO  c.i.a.l.s.LifecycleStateExplorer  -
>>>>>> Reading state from /tmp/mock_savepoint10990862094634162213.
>>>>>> 2025-02-12 09:52:51,811 INFO  LifecycleStateExplorerTest  - Completed
>>>>>> now
>>>>>> 2025-02-12 09:52:51,813 INFO  LifecycleStateExplorerTest  - Duration:
>>>>>> PT49M12.788979538S ms
>>>>>> ```
>>>>>>
>>>>>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: (run 2/2)
>>>>>> ```
>>>>>> 2025-02-12 10:12:26,453 INFO  LifecycleStateExplorerTest  - Starting
>>>>>> now
>>>>>> 2025-02-12 10:12:26,458 INFO  c.i.a.l.s.LifecycleStateExplorer  -
>>>>>> Reading state from /tmp/mock_savepoint7967784813864743408.
>>>>>> 2025-02-12 10:32:20,215 INFO  LifecycleStateExplorerTest  - Completed
>>>>>> now
>>>>>> 2025-02-12 10:32:20,215 INFO  LifecycleStateExplorerTest  - Duration:
>>>>>> PT19M53.757274969S ms
>>>>>> ```
>>>>>>
>>>>>> #### using HashMapMemoryBackEnd (for reference)
>>>>>> ```
>>>>>> 2025-02-12 10:39:03,618 INFO  LifecycleStateExplorerTest  - Starting
>>>>>> now
>>>>>> 2025-02-12 10:39:03,622 INFO  c.i.a.l.s.LifecycleStateExplorer  -
>>>>>> Reading state from /tmp/mock_savepoint14340081990006677909.
>>>>>> 2025-02-12 10:43:16,454 INFO  LifecycleStateExplorerTest  - Completed
>>>>>> now
>>>>>> 2025-02-12 10:43:16,457 INFO  LifecycleStateExplorerTest  - Duration:
>>>>>> PT4M12.832810524S ms
>>>>>> ```
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Feb 11, 2025 at 3:39 PM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Yeah, I think this is more like the 1.x and 2.x incompatibility.
>>>>>>> I've just opened the PR agains 1.20 which you can cherry-pick here
>>>>>>> [1].
>>>>>>>
>>>>>>> [1] https://github.com/apache/flink/pull/26145
>>>>>>>
>>>>>>> BR,
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 11, 2025 at 4:19 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Gabor,
>>>>>>>>
>>>>>>>> So, a bit of progress,
>>>>>>>>
>>>>>>>> I managed to compile our stuff against your 2.1-SNAPSHOT (with a
>>>>>>>> bit of chopping around deprecated/changed and removed APIs - that 
>>>>>>>> wasn't
>>>>>>>> too bad), but that failed to read the state I was using before (that 
>>>>>>>> was
>>>>>>>> generated with a Flink 1.20). This is the stack trace I get. I suspect 
>>>>>>>> this
>>>>>>>> has more to do with state compatibility between 1.20 and 2.1... I was
>>>>>>>> surprised the error is against a timer state. The end of the stack 
>>>>>>>> trace is
>>>>>>>> below.
>>>>>>>>
>>>>>>>> I will try to apply your patch/change/PR to our 1.20 build, but
>>>>>>>> it's not a simple git apply 😭😭😭.
>>>>>>>>
>>>>>>>> $ git apply --check ~/Downloads/26134.patch.txt
>>>>>>>> error: patch failed:
>>>>>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226
>>>>>>>> error:
>>>>>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
>>>>>>>> patch does not apply
>>>>>>>> error: patch failed:
>>>>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24
>>>>>>>> error:
>>>>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:
>>>>>>>> patch does not apply
>>>>>>>> error:
>>>>>>>> flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java:
>>>>>>>> No such file or directory
>>>>>>>> error:
>>>>>>>> flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java:
>>>>>>>> No such file or directory
>>>>>>>> error: patch failed:
>>>>>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22
>>>>>>>> error:
>>>>>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:
>>>>>>>> patch does not apply
>>>>>>>> error:
>>>>>>>> flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java:
>>>>>>>> No such file or directory
>>>>>>>> error:
>>>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java:
>>>>>>>> No such file or directory
>>>>>>>> error:
>>>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java:
>>>>>>>> No such file or directory
>>>>>>>> error:
>>>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java:
>>>>>>>> No such file or directory
>>>>>>>>
>>>>>>>>
>>>>>>>> stack trace if relevant...
>>>>>>>>
>>>>>>>> java.util.concurrent.CompletionException:
>>>>>>>> java.lang.RuntimeException: Failed to fetch next result
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
>>>>>>>> ~[?:?]
>>>>>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result
>>>>>>>>     ... many more
>>>>>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result
>>>>>>>>     ... many more
>>>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>>>>>> failed.
>>>>>>>>     ... 7 more
>>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>>>>>> Job execution failed.
>>>>>>>>     ... many more
>>>>>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>>>>>> suppressed by NoRestartBackoffTimeStrategy
>>>>>>>>     ... many more
>>>>>>>> Caused by: java.io.IOException: Failed to restore timer state
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>>>>> Method) ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>>>>> Method) ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>> Caused by: org.apache.flink.util.StateMigrationException: The new
>>>>>>>> state serializer
>>>>>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a)
>>>>>>>> must not be incompatible with the old state serializer
>>>>>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44
>>>>>>>> ).
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858)
>>>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763)
>>>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020)
>>>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007)
>>>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>>>>> Method) ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>>>>>> ~[?:?]
>>>>>>>>     at
>>>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>>>>>> ~[classes/:?]
>>>>>>>>     at
>>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>     at
>>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Gabor,
>>>>>>>>>
>>>>>>>>> Trying to but I struggle to compile my stuff against your Flink
>>>>>>>>> build... tried to apply your PR as a patch on my 1.20 modified fork 
>>>>>>>>> and
>>>>>>>>> that didn't go well either. It will take time to untangle.
>>>>>>>>>
>>>>>>>>> Will keep you updated if I make progress
>>>>>>>>>
>>>>>>>>> JM
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi <
>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>
>>>>>>>>>> FYI, I've just opened this [1] PR to address the issue in a clean
>>>>>>>>>> way.
>>>>>>>>>> May I ask you to test it on your side?
>>>>>>>>>>
>>>>>>>>>> [1] https://github.com/apache/flink/pull/26134
>>>>>>>>>>
>>>>>>>>>> BR,
>>>>>>>>>> G
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <
>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just a little update on this. We've made our first POC with the
>>>>>>>>>>> redesigned approach and the numbers are promising :)
>>>>>>>>>>> It still requires huge efforts in
>>>>>>>>>>> development/correctness/performance perspective but seems like we 
>>>>>>>>>>> have
>>>>>>>>>>> something in the pocket.
>>>>>>>>>>>
>>>>>>>>>>> Test data: 256Mb state file with a single operator and 2 value
>>>>>>>>>>> states
>>>>>>>>>>> - Old execution time: 25M27.126737S
>>>>>>>>>>> - New execution time: 1M19.602042S
>>>>>>>>>>> In short: ~95% performance gain.
>>>>>>>>>>>
>>>>>>>>>>> G
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <
>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> In short, when you don't care about
>>>>>>>>>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the 
>>>>>>>>>>>> safe
>>>>>>>>>>>> side.
>>>>>>>>>>>>
>>>>>>>>>>>> G
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <
>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I am still hoping that I am still good. I just read the
>>>>>>>>>>>>> savepoint to extract information (parallelism 1, and only 1 task 
>>>>>>>>>>>>> manager) .
>>>>>>>>>>>>> I also know it has been created by a job using a HashMap backend. 
>>>>>>>>>>>>> And I do
>>>>>>>>>>>>> not care about duplicates.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I should still be good, right? from what I saw I never read
>>>>>>>>>>>>> any duplicate keys.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>> JM
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <
>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Guys,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We've just had an in-depth analysis and we think that
>>>>>>>>>>>>>> removing that particular line causes correctness issues under 
>>>>>>>>>>>>>> some
>>>>>>>>>>>>>> circumstances.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Namely key duplicates can happen when multiple column
>>>>>>>>>>>>>> families are processed at the same time. Not need to mention 
>>>>>>>>>>>>>> that it would
>>>>>>>>>>>>>> cause multiple
>>>>>>>>>>>>>> `readKey` calls which ends up in just wrong calculation logic
>>>>>>>>>>>>>> (for example in simple sum calculation).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We've a vision how this can be solved in a clean way but it
>>>>>>>>>>>>>> will take some time.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > Are there any plans on a migration guide or something for
>>>>>>>>>>>>>> users to adapt their QS observers (beyond the current docs)?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The gap between the two approaches are quite huge and
>>>>>>>>>>>>>> considering the actual bugs and improvement possibilities in the 
>>>>>>>>>>>>>> state
>>>>>>>>>>>>>> processor API
>>>>>>>>>>>>>> I would say this can come later on at least on my plate. When
>>>>>>>>>>>>>> you see the gaps and you know how to fill them feel free to 
>>>>>>>>>>>>>> contribute and
>>>>>>>>>>>>>> we can shepherd the PRs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <
>>>>>>>>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks both for your work on this!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On a related note, since Queryable State (QS) is going away
>>>>>>>>>>>>>>> soon, streamlining the State Processor API as much as possible 
>>>>>>>>>>>>>>> makes a lot
>>>>>>>>>>>>>>> of sense.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Are there any plans on a migration guide or something for
>>>>>>>>>>>>>>> users to adapt their QS observers (beyond the current docs)?
>>>>>>>>>>>>>>> (State-)Observability-wise Flink has some room for improvement 
>>>>>>>>>>>>>>> I would say.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Salva
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <
>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your time investment and to share the numbers,
>>>>>>>>>>>>>>>> it's super helpful.
>>>>>>>>>>>>>>>> Ping me any time when you have further info to share.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not
>>>>>>>>>>>>>>>> terrible.
>>>>>>>>>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need
>>>>>>>>>>>>>>>> to go beyond...
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Since we measure similar numbers with the unpatched Flink
>>>>>>>>>>>>>>>> plus this has been reported this by several users,
>>>>>>>>>>>>>>>> we must make changes in this area. It's still a question
>>>>>>>>>>>>>>>> whether the tested patch is the right approach
>>>>>>>>>>>>>>>> but at least we've touched the root cause.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The next step on my side is to have a deep dive and
>>>>>>>>>>>>>>>> understand all the aspects why remove is there,
>>>>>>>>>>>>>>>> how the remove elimination would effect existing use-cases
>>>>>>>>>>>>>>>> and consider all other possibilities.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> BR,
>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <
>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I finally got to run that change through. I have a 6Gb
>>>>>>>>>>>>>>>>> savepoint I read and parse for reference.
>>>>>>>>>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of
>>>>>>>>>>>>>>>>> RAM)
>>>>>>>>>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and
>>>>>>>>>>>>>>>>> requires less than 2Gb)
>>>>>>>>>>>>>>>>> - RockDb without the patch wasn't even halfway through
>>>>>>>>>>>>>>>>> after 12 hours.... (I gave up)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I don't think I have any duplicates because the
>>>>>>>>>>>>>>>>> application that generates the savepoint is using HashMap, so 
>>>>>>>>>>>>>>>>> my scenario
>>>>>>>>>>>>>>>>> may not be representative. I am using IBM Seremu Java 17 
>>>>>>>>>>>>>>>>> (openJ9-0.46).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>>>>>>>>>>>>>> controlled environment. but I think it's conclusive enough. I 
>>>>>>>>>>>>>>>>> will need to
>>>>>>>>>>>>>>>>> run further tests but I think we will patch our Flink. using 
>>>>>>>>>>>>>>>>> a system
>>>>>>>>>>>>>>>>> property to configure it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hope this help
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Just to give an update. I've applied the mentioned patch
>>>>>>>>>>>>>>>>>> and the execution time drastically decreased (the gain is 
>>>>>>>>>>>>>>>>>> 98.9%):
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2025-02-04 16:52:54,448 INFO  
>>>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReader                   [] - 
>>>>>>>>>>>>>>>>>> Execution time: PT14.690426S
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I need to double check what that would mean to
>>>>>>>>>>>>>>>>>> correctness and all other aspects.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Please report back on how the patch behaves including
>>>>>>>>>>>>>>>>>>> any side effects.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Now I'm in testing the state reading with processor API
>>>>>>>>>>>>>>>>>>> vs the mentioned job where we control the keys.
>>>>>>>>>>>>>>>>>>> The difference is extreme, especially because the
>>>>>>>>>>>>>>>>>>> numbers are coming from reading ~40Mb state file😅
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO  
>>>>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReader                   [] - 
>>>>>>>>>>>>>>>>>>> Execution time: PT22M24.612954S
>>>>>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO  
>>>>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReaderJob                [] - 
>>>>>>>>>>>>>>>>>>> Execution time: PT6.930659S
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Don't need to mention that the bigger is the processor
>>>>>>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <
>>>>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> That's a good idea, Sadly I have no control over the
>>>>>>>>>>>>>>>>>>>> keys....
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I was going to patch Flink with the suggestion in
>>>>>>>>>>>>>>>>>>>> FLINK-37109
>>>>>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first
>>>>>>>>>>>>>>>>>>>> to see how that goes. If that brings RockDb performance in 
>>>>>>>>>>>>>>>>>>>> an acceptable
>>>>>>>>>>>>>>>>>>>> range for us we might go that way. I really like the light 
>>>>>>>>>>>>>>>>>>>> memory
>>>>>>>>>>>>>>>>>>>> consumption of RockDb for that kind of side job.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>>>>>>>>>>>>>> set the operator UID on a custom written operator,
>>>>>>>>>>>>>>>>>>>>> which opens the state info for you.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The only drawback is that you must know the keyBy
>>>>>>>>>>>>>>>>>>>>> range... this can be problematic but if you can do it 
>>>>>>>>>>>>>>>>>>>>> it's a win :)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <
>>>>>>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the
>>>>>>>>>>>>>>>>>>>>>> savepoint in pages, instead of as a single blob up front 
>>>>>>>>>>>>>>>>>>>>>> which I think is
>>>>>>>>>>>>>>>>>>>>>> what the hashmap does... we just want to be called for 
>>>>>>>>>>>>>>>>>>>>>> each entry and
>>>>>>>>>>>>>>>>>>>>>> extract the bit we want in that scenario.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Never mind
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting
>>>>>>>>>>>>>>>>>>>>>> for nothing.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is
>>>>>>>>>>>>>>>>>>>>>>> not reaching the performance criteria which it should 
>>>>>>>>>>>>>>>>>>>>>>> be. There is an open
>>>>>>>>>>>>>>>>>>>>>>> issue for it [1].
>>>>>>>>>>>>>>>>>>>>>>> The hashmap based approach was and is always
>>>>>>>>>>>>>>>>>>>>>>> expecting more memory. So if the memory footprint is a 
>>>>>>>>>>>>>>>>>>>>>>> hard requirement
>>>>>>>>>>>>>>>>>>>>>>> then RocksDB is the only way now.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to
>>>>>>>>>>>>>>>>>>>>>>> make it better. All I can promise that I'm now 
>>>>>>>>>>>>>>>>>>>>>>> measuring performance of the
>>>>>>>>>>>>>>>>>>>>>>> RocksDB approach
>>>>>>>>>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we
>>>>>>>>>>>>>>>>>>>>>>> don't know what exactly causes the slowness the new 
>>>>>>>>>>>>>>>>>>>>>>> Frocksdb-8.10.0 can be
>>>>>>>>>>>>>>>>>>>>>>> also an imrpvement.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> BR,
>>>>>>>>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint
>>>>>>>>>>>>>>>>>>>>>>>> and minimise the memory consumption. We just need to 
>>>>>>>>>>>>>>>>>>>>>>>> transform it into
>>>>>>>>>>>>>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap
>>>>>>>>>>>>>>>>>>>>>>>> backend, and is spread over 6 task slots in 6 pods 
>>>>>>>>>>>>>>>>>>>>>>>> (under k8s). Savepoints
>>>>>>>>>>>>>>>>>>>>>>>> are saved on S3. A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>>>>>>>>>>>>>         SavepointReader.read(env,
>>>>>>>>>>>>>>>>>>>>>>>> savepointLocation, new HashMapStateBackend());
>>>>>>>>>>>>>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation,
>>>>>>>>>>>>>>>>>>>>>>>> new EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb
>>>>>>>>>>>>>>>>>>>>>>>> backend. That works and is low on memory usage, but is 
>>>>>>>>>>>>>>>>>>>>>>>> very very slow. We
>>>>>>>>>>>>>>>>>>>>>>>> noticed the iterator is available very early on, but 
>>>>>>>>>>>>>>>>>>>>>>>> it is slow...
>>>>>>>>>>>>>>>>>>>>>>>> - The other is to read the savepoint with a
>>>>>>>>>>>>>>>>>>>>>>>> HashMap backend. That is very fast, as expected. 
>>>>>>>>>>>>>>>>>>>>>>>> However the iterator
>>>>>>>>>>>>>>>>>>>>>>>> apparently only returns once the whole savepoint has 
>>>>>>>>>>>>>>>>>>>>>>>> been loaded in the
>>>>>>>>>>>>>>>>>>>>>>>> HashMap, so heavy memory consumption.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune
>>>>>>>>>>>>>>>>>>>>>>>> it so that it does not consume all the memory ? or 
>>>>>>>>>>>>>>>>>>>>>>>> maybe reading it in
>>>>>>>>>>>>>>>>>>>>>>>> parts...
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to