Hi Gabor, Thanks for the seedy turn around. I will rebase our flink to capture both Flink 1.20.1 and your PR, which I think is the only one that didn't make the cut.
As for the migration to 2.x, this is not something I gave much thought about yet. We also have windows and TTL in the state. I did not check yet if Flink has some APIs in place to read a 1.x state and write a 2.x one. I also saw mentions of maybe upgrading kryo to a more recent version in [FLINK-3154] Update Kryo version from 2.24.0 to latest Kryo LTS version - ASF JIRA <https://issues.apache.org/jira/browse/FLINK-3154>, but this is not in the 2.x codebase yet. JM On Fri, Feb 14, 2025 at 3:42 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Jean-Marc, > > FYI the changes are merged into master and release-1.20 so feel free to > use it. > > In terms of state migration from 1.x to 2.x this can be challenging for > the whole community so if you have some info please share. > > G > > > On Wed, Feb 12, 2025 at 2:40 PM Jean-Marc Paulin <jm.pau...@gmail.com> > wrote: > >> Hi Gabor >> >> Glad we helped, but thanks to you for working on this. The performance >> improvement you made is fantastic. >> >> I think on our side we will wait for the final official patch, and then >> decide if we move up to 1.20.1 (or whatever includes it) or just apply the >> patch on what we have. Our Flink is heavily customized on top of 1.20 for >> HA configuration and priority preferences in the TwoInputStreamOperator. >> >> As part of this we learnt we will have to find a good way to handle state >> migration when moving up to Flink 2.x, so that was a good learning >> exercise for us. >> >> Kind regards >> >> JM >> >> >> >> On Wed, Feb 12, 2025 at 12:57 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Jean-Marc, >>> >>> Thanks for your efforts! We've done quite extensive tests inside and >>> they are all passing. Good to see that the number of keys matches on your >>> side too. >>> Regarding key ordering, the state processor API is not giving any >>> guarantees in terms of ordering. >>> >>> All in all I'm confident with the solution so green light from my side. >>> I'm intended to open the PR for some time to have everybody the possibility >>> to make it better or raise any kind of concerns. >>> >>> BR, >>> G >>> >>> >>> On Wed, Feb 12, 2025 at 1:21 PM Jean-Marc Paulin <jm.pau...@gmail.com> >>> wrote: >>> >>>> Hi gabor >>>> >>>> I see all my 1,331,301 keys with your patch, which is exactly what I am >>>> expecting. If you have more specific concerns I suppose I can instrument my >>>> code further. I have no expectation regarding the key ordering. >>>> >>>> JM >>>> >>>> >>>> On Wed, Feb 12, 2025 at 11:29 AM Gabor Somogyi < >>>> gabor.g.somo...@gmail.com> wrote: >>>> >>>>> I think the hashmap vs patched RocksDB makes sense, at least I'm >>>>> measuring similar number with relatively small states. >>>>> RockDb commenting out the remove() is a bit surprisingly high but >>>>> since it's causing correctness issues under some circumstances I would >>>>> abandon that. >>>>> >>>>> > I probably need more time to ensure we definitely see all the keys, >>>>> but that looks very good. >>>>> >>>>> Yeah, correctness is key here so waiting on your results. >>>>> >>>>> BR, >>>>> G >>>>> >>>>> >>>>> On Wed, Feb 12, 2025 at 11:56 AM Jean-Marc Paulin <j...@paulin.co.uk> >>>>> wrote: >>>>> >>>>>> Hi Gabor, >>>>>> >>>>>> I applied your 1.20 patch, and I got some very good numbers from >>>>>> it... so for my 5GB savepoint, I made sure I skip all my code overhead to >>>>>> get the raw number, and I can read it in >>>>>> - HashMap : 4 minutes >>>>>> - RockDb with your patch: ~19 minutes >>>>>> - RockDb commenting out the remove(): 49 minutes >>>>>> >>>>>> I am not sure these numbers make sense, and this is on a VM on my >>>>>> laptop, so not exactly a good performance testing environment, but the >>>>>> numbers I see are pretty good. I probably need more time to ensure we >>>>>> definitely see all the keys, but that looks very good. >>>>>> >>>>>> Hope this help >>>>>> >>>>>> JM >>>>>> >>>>>> >>>>>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: ( run 1/2) >>>>>> ``` >>>>>> 2025-02-12 08:22:59,282 INFO LifecycleStateExplorerTest - Starting >>>>>> now >>>>>> 2025-02-12 08:22:59,283 INFO c.i.a.l.s.LifecycleStateExplorer - >>>>>> Reading state from /tmp/mock_savepoint16385495638880357651. >>>>>> 2025-02-12 08:41:29,066 INFO LifecycleStateExplorerTest - Completed >>>>>> now >>>>>> 2025-02-12 08:41:29,066 INFO LifecycleStateExplorerTest - Duration: >>>>>> PT18M29.783388324S ms >>>>>> ``` >>>>>> >>>>>> #### using our flink 1.20 build, that removes the >>>>>> keysAndNamespaces.remove(); line >>>>>> ``` >>>>>> 2025-02-12 09:03:39,018 INFO LifecycleStateExplorerTest - Starting >>>>>> now >>>>>> 2025-02-12 09:03:39,024 INFO c.i.a.l.s.LifecycleStateExplorer - >>>>>> Reading state from /tmp/mock_savepoint10990862094634162213. >>>>>> 2025-02-12 09:52:51,811 INFO LifecycleStateExplorerTest - Completed >>>>>> now >>>>>> 2025-02-12 09:52:51,813 INFO LifecycleStateExplorerTest - Duration: >>>>>> PT49M12.788979538S ms >>>>>> ``` >>>>>> >>>>>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: (run 2/2) >>>>>> ``` >>>>>> 2025-02-12 10:12:26,453 INFO LifecycleStateExplorerTest - Starting >>>>>> now >>>>>> 2025-02-12 10:12:26,458 INFO c.i.a.l.s.LifecycleStateExplorer - >>>>>> Reading state from /tmp/mock_savepoint7967784813864743408. >>>>>> 2025-02-12 10:32:20,215 INFO LifecycleStateExplorerTest - Completed >>>>>> now >>>>>> 2025-02-12 10:32:20,215 INFO LifecycleStateExplorerTest - Duration: >>>>>> PT19M53.757274969S ms >>>>>> ``` >>>>>> >>>>>> #### using HashMapMemoryBackEnd (for reference) >>>>>> ``` >>>>>> 2025-02-12 10:39:03,618 INFO LifecycleStateExplorerTest - Starting >>>>>> now >>>>>> 2025-02-12 10:39:03,622 INFO c.i.a.l.s.LifecycleStateExplorer - >>>>>> Reading state from /tmp/mock_savepoint14340081990006677909. >>>>>> 2025-02-12 10:43:16,454 INFO LifecycleStateExplorerTest - Completed >>>>>> now >>>>>> 2025-02-12 10:43:16,457 INFO LifecycleStateExplorerTest - Duration: >>>>>> PT4M12.832810524S ms >>>>>> ``` >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Feb 11, 2025 at 3:39 PM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> Yeah, I think this is more like the 1.x and 2.x incompatibility. >>>>>>> I've just opened the PR agains 1.20 which you can cherry-pick here >>>>>>> [1]. >>>>>>> >>>>>>> [1] https://github.com/apache/flink/pull/26145 >>>>>>> >>>>>>> BR, >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 11, 2025 at 4:19 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Gabor, >>>>>>>> >>>>>>>> So, a bit of progress, >>>>>>>> >>>>>>>> I managed to compile our stuff against your 2.1-SNAPSHOT (with a >>>>>>>> bit of chopping around deprecated/changed and removed APIs - that >>>>>>>> wasn't >>>>>>>> too bad), but that failed to read the state I was using before (that >>>>>>>> was >>>>>>>> generated with a Flink 1.20). This is the stack trace I get. I suspect >>>>>>>> this >>>>>>>> has more to do with state compatibility between 1.20 and 2.1... I was >>>>>>>> surprised the error is against a timer state. The end of the stack >>>>>>>> trace is >>>>>>>> below. >>>>>>>> >>>>>>>> I will try to apply your patch/change/PR to our 1.20 build, but >>>>>>>> it's not a simple git apply 😭😭😭. >>>>>>>> >>>>>>>> $ git apply --check ~/Downloads/26134.patch.txt >>>>>>>> error: patch failed: >>>>>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226 >>>>>>>> error: >>>>>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java: >>>>>>>> patch does not apply >>>>>>>> error: patch failed: >>>>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24 >>>>>>>> error: >>>>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java: >>>>>>>> patch does not apply >>>>>>>> error: >>>>>>>> flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java: >>>>>>>> No such file or directory >>>>>>>> error: >>>>>>>> flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java: >>>>>>>> No such file or directory >>>>>>>> error: patch failed: >>>>>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22 >>>>>>>> error: >>>>>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java: >>>>>>>> patch does not apply >>>>>>>> error: >>>>>>>> flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java: >>>>>>>> No such file or directory >>>>>>>> error: >>>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java: >>>>>>>> No such file or directory >>>>>>>> error: >>>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java: >>>>>>>> No such file or directory >>>>>>>> error: >>>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java: >>>>>>>> No such file or directory >>>>>>>> >>>>>>>> >>>>>>>> stack trace if relevant... >>>>>>>> >>>>>>>> java.util.concurrent.CompletionException: >>>>>>>> java.lang.RuntimeException: Failed to fetch next result >>>>>>>> at >>>>>>>> com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) >>>>>>>> ~[?:?] >>>>>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result >>>>>>>> ... many more >>>>>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result >>>>>>>> ... many more >>>>>>>> Caused by: java.util.concurrent.ExecutionException: >>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>>>>>> failed. >>>>>>>> ... 7 more >>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>>>> Job execution failed. >>>>>>>> ... many more >>>>>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is >>>>>>>> suppressed by NoRestartBackoffTimeStrategy >>>>>>>> ... many more >>>>>>>> Caused by: java.io.IOException: Failed to restore timer state >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>>>>> Method) ~[?:?] >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> Caused by: java.lang.RuntimeException: Error while getting state >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>>>>> Method) ~[?:?] >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> Caused by: org.apache.flink.util.StateMigrationException: The new >>>>>>>> state serializer >>>>>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a) >>>>>>>> must not be incompatible with the old state serializer >>>>>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44 >>>>>>>> ). >>>>>>>> at >>>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858) >>>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763) >>>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020) >>>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007) >>>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>>>>> Method) ~[?:?] >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>>>>>> ~[?:?] >>>>>>>> at >>>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>>>>>> ~[classes/:?] >>>>>>>> at >>>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Gabor, >>>>>>>>> >>>>>>>>> Trying to but I struggle to compile my stuff against your Flink >>>>>>>>> build... tried to apply your PR as a patch on my 1.20 modified fork >>>>>>>>> and >>>>>>>>> that didn't go well either. It will take time to untangle. >>>>>>>>> >>>>>>>>> Will keep you updated if I make progress >>>>>>>>> >>>>>>>>> JM >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi < >>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Jean-Marc, >>>>>>>>>> >>>>>>>>>> FYI, I've just opened this [1] PR to address the issue in a clean >>>>>>>>>> way. >>>>>>>>>> May I ask you to test it on your side? >>>>>>>>>> >>>>>>>>>> [1] https://github.com/apache/flink/pull/26134 >>>>>>>>>> >>>>>>>>>> BR, >>>>>>>>>> G >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi < >>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Just a little update on this. We've made our first POC with the >>>>>>>>>>> redesigned approach and the numbers are promising :) >>>>>>>>>>> It still requires huge efforts in >>>>>>>>>>> development/correctness/performance perspective but seems like we >>>>>>>>>>> have >>>>>>>>>>> something in the pocket. >>>>>>>>>>> >>>>>>>>>>> Test data: 256Mb state file with a single operator and 2 value >>>>>>>>>>> states >>>>>>>>>>> - Old execution time: 25M27.126737S >>>>>>>>>>> - New execution time: 1M19.602042S >>>>>>>>>>> In short: ~95% performance gain. >>>>>>>>>>> >>>>>>>>>>> G >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi < >>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> In short, when you don't care about >>>>>>>>>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the >>>>>>>>>>>> safe >>>>>>>>>>>> side. >>>>>>>>>>>> >>>>>>>>>>>> G >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin < >>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I am still hoping that I am still good. I just read the >>>>>>>>>>>>> savepoint to extract information (parallelism 1, and only 1 task >>>>>>>>>>>>> manager) . >>>>>>>>>>>>> I also know it has been created by a job using a HashMap backend. >>>>>>>>>>>>> And I do >>>>>>>>>>>>> not care about duplicates. >>>>>>>>>>>>> >>>>>>>>>>>>> I should still be good, right? from what I saw I never read >>>>>>>>>>>>> any duplicate keys. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> JM >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi < >>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Guys, >>>>>>>>>>>>>> >>>>>>>>>>>>>> We've just had an in-depth analysis and we think that >>>>>>>>>>>>>> removing that particular line causes correctness issues under >>>>>>>>>>>>>> some >>>>>>>>>>>>>> circumstances. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Namely key duplicates can happen when multiple column >>>>>>>>>>>>>> families are processed at the same time. Not need to mention >>>>>>>>>>>>>> that it would >>>>>>>>>>>>>> cause multiple >>>>>>>>>>>>>> `readKey` calls which ends up in just wrong calculation logic >>>>>>>>>>>>>> (for example in simple sum calculation). >>>>>>>>>>>>>> >>>>>>>>>>>>>> We've a vision how this can be solved in a clean way but it >>>>>>>>>>>>>> will take some time. >>>>>>>>>>>>>> >>>>>>>>>>>>>> > Are there any plans on a migration guide or something for >>>>>>>>>>>>>> users to adapt their QS observers (beyond the current docs)? >>>>>>>>>>>>>> >>>>>>>>>>>>>> The gap between the two approaches are quite huge and >>>>>>>>>>>>>> considering the actual bugs and improvement possibilities in the >>>>>>>>>>>>>> state >>>>>>>>>>>>>> processor API >>>>>>>>>>>>>> I would say this can come later on at least on my plate. When >>>>>>>>>>>>>> you see the gaps and you know how to fill them feel free to >>>>>>>>>>>>>> contribute and >>>>>>>>>>>>>> we can shepherd the PRs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> G >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara < >>>>>>>>>>>>>> salcantara...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks both for your work on this! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On a related note, since Queryable State (QS) is going away >>>>>>>>>>>>>>> soon, streamlining the State Processor API as much as possible >>>>>>>>>>>>>>> makes a lot >>>>>>>>>>>>>>> of sense. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Are there any plans on a migration guide or something for >>>>>>>>>>>>>>> users to adapt their QS observers (beyond the current docs)? >>>>>>>>>>>>>>> (State-)Observability-wise Flink has some room for improvement >>>>>>>>>>>>>>> I would say. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Salva >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi < >>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your time investment and to share the numbers, >>>>>>>>>>>>>>>> it's super helpful. >>>>>>>>>>>>>>>> Ping me any time when you have further info to share. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not >>>>>>>>>>>>>>>> terrible. >>>>>>>>>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need >>>>>>>>>>>>>>>> to go beyond... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Since we measure similar numbers with the unpatched Flink >>>>>>>>>>>>>>>> plus this has been reported this by several users, >>>>>>>>>>>>>>>> we must make changes in this area. It's still a question >>>>>>>>>>>>>>>> whether the tested patch is the right approach >>>>>>>>>>>>>>>> but at least we've touched the root cause. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The next step on my side is to have a deep dive and >>>>>>>>>>>>>>>> understand all the aspects why remove is there, >>>>>>>>>>>>>>>> how the remove elimination would effect existing use-cases >>>>>>>>>>>>>>>> and consider all other possibilities. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> BR, >>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin < >>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I finally got to run that change through. I have a 6Gb >>>>>>>>>>>>>>>>> savepoint I read and parse for reference. >>>>>>>>>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of >>>>>>>>>>>>>>>>> RAM) >>>>>>>>>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and >>>>>>>>>>>>>>>>> requires less than 2Gb) >>>>>>>>>>>>>>>>> - RockDb without the patch wasn't even halfway through >>>>>>>>>>>>>>>>> after 12 hours.... (I gave up) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I don't think I have any duplicates because the >>>>>>>>>>>>>>>>> application that generates the savepoint is using HashMap, so >>>>>>>>>>>>>>>>> my scenario >>>>>>>>>>>>>>>>> may not be representative. I am using IBM Seremu Java 17 >>>>>>>>>>>>>>>>> (openJ9-0.46). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> That was run on a VM on my laptop, so not exactly a >>>>>>>>>>>>>>>>> controlled environment. but I think it's conclusive enough. I >>>>>>>>>>>>>>>>> will need to >>>>>>>>>>>>>>>>> run further tests but I think we will patch our Flink. using >>>>>>>>>>>>>>>>> a system >>>>>>>>>>>>>>>>> property to configure it. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hope this help >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi < >>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Just to give an update. I've applied the mentioned patch >>>>>>>>>>>>>>>>>> and the execution time drastically decreased (the gain is >>>>>>>>>>>>>>>>>> 98.9%): >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2025-02-04 16:52:54,448 INFO >>>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReader [] - >>>>>>>>>>>>>>>>>> Execution time: PT14.690426S >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I need to double check what that would mean to >>>>>>>>>>>>>>>>>> correctness and all other aspects. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Please report back on how the patch behaves including >>>>>>>>>>>>>>>>>>> any side effects. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Now I'm in testing the state reading with processor API >>>>>>>>>>>>>>>>>>> vs the mentioned job where we control the keys. >>>>>>>>>>>>>>>>>>> The difference is extreme, especially because the >>>>>>>>>>>>>>>>>>> numbers are coming from reading ~40Mb state file😅 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO >>>>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReader [] - >>>>>>>>>>>>>>>>>>> Execution time: PT22M24.612954S >>>>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO >>>>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReaderJob [] - >>>>>>>>>>>>>>>>>>> Execution time: PT6.930659S >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Don't need to mention that the bigger is the processor >>>>>>>>>>>>>>>>>>> API. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin < >>>>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> That's a good idea, Sadly I have no control over the >>>>>>>>>>>>>>>>>>>> keys.... >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I was going to patch Flink with the suggestion in >>>>>>>>>>>>>>>>>>>> FLINK-37109 >>>>>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first >>>>>>>>>>>>>>>>>>>> to see how that goes. If that brings RockDb performance in >>>>>>>>>>>>>>>>>>>> an acceptable >>>>>>>>>>>>>>>>>>>> range for us we might go that way. I really like the light >>>>>>>>>>>>>>>>>>>> memory >>>>>>>>>>>>>>>>>>>> consumption of RockDb for that kind of side job. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>>>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>>>>>>>>>>>>>>> set the operator UID on a custom written operator, >>>>>>>>>>>>>>>>>>>>> which opens the state info for you. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The only drawback is that you must know the keyBy >>>>>>>>>>>>>>>>>>>>> range... this can be problematic but if you can do it >>>>>>>>>>>>>>>>>>>>> it's a win :) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin < >>>>>>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the >>>>>>>>>>>>>>>>>>>>>> savepoint in pages, instead of as a single blob up front >>>>>>>>>>>>>>>>>>>>>> which I think is >>>>>>>>>>>>>>>>>>>>>> what the hashmap does... we just want to be called for >>>>>>>>>>>>>>>>>>>>>> each entry and >>>>>>>>>>>>>>>>>>>>>> extract the bit we want in that scenario. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Never mind >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting >>>>>>>>>>>>>>>>>>>>>> for nothing. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is >>>>>>>>>>>>>>>>>>>>>>> not reaching the performance criteria which it should >>>>>>>>>>>>>>>>>>>>>>> be. There is an open >>>>>>>>>>>>>>>>>>>>>>> issue for it [1]. >>>>>>>>>>>>>>>>>>>>>>> The hashmap based approach was and is always >>>>>>>>>>>>>>>>>>>>>>> expecting more memory. So if the memory footprint is a >>>>>>>>>>>>>>>>>>>>>>> hard requirement >>>>>>>>>>>>>>>>>>>>>>> then RocksDB is the only way now. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to >>>>>>>>>>>>>>>>>>>>>>> make it better. All I can promise that I'm now >>>>>>>>>>>>>>>>>>>>>>> measuring performance of the >>>>>>>>>>>>>>>>>>>>>>> RocksDB approach >>>>>>>>>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we >>>>>>>>>>>>>>>>>>>>>>> don't know what exactly causes the slowness the new >>>>>>>>>>>>>>>>>>>>>>> Frocksdb-8.10.0 can be >>>>>>>>>>>>>>>>>>>>>>> also an imrpvement. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> BR, >>>>>>>>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint >>>>>>>>>>>>>>>>>>>>>>>> and minimise the memory consumption. We just need to >>>>>>>>>>>>>>>>>>>>>>>> transform it into >>>>>>>>>>>>>>>>>>>>>>>> something else for investigation. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap >>>>>>>>>>>>>>>>>>>>>>>> backend, and is spread over 6 task slots in 6 pods >>>>>>>>>>>>>>>>>>>>>>>> (under k8s). Savepoints >>>>>>>>>>>>>>>>>>>>>>>> are saved on S3. A savepoint can be 4-5Gb or more. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>>>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>>>>>>>>>>>>>>> SavepointReader.read(env, >>>>>>>>>>>>>>>>>>>>>>>> savepointLocation, new HashMapStateBackend()); >>>>>>>>>>>>>>>>>>>>>>>> // SavepointReader.read(env, savepointLocation, >>>>>>>>>>>>>>>>>>>>>>>> new EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>>>>>>>>>>> mainOperatorState = >>>>>>>>>>>>>>>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>>>>>>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>>>>>>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>>>>>>>>>>>>>>> /// extract what we need here >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> We tried two approaches: >>>>>>>>>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb >>>>>>>>>>>>>>>>>>>>>>>> backend. That works and is low on memory usage, but is >>>>>>>>>>>>>>>>>>>>>>>> very very slow. We >>>>>>>>>>>>>>>>>>>>>>>> noticed the iterator is available very early on, but >>>>>>>>>>>>>>>>>>>>>>>> it is slow... >>>>>>>>>>>>>>>>>>>>>>>> - The other is to read the savepoint with a >>>>>>>>>>>>>>>>>>>>>>>> HashMap backend. That is very fast, as expected. >>>>>>>>>>>>>>>>>>>>>>>> However the iterator >>>>>>>>>>>>>>>>>>>>>>>> apparently only returns once the whole savepoint has >>>>>>>>>>>>>>>>>>>>>>>> been loaded in the >>>>>>>>>>>>>>>>>>>>>>>> HashMap, so heavy memory consumption. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune >>>>>>>>>>>>>>>>>>>>>>>> it so that it does not consume all the memory ? or >>>>>>>>>>>>>>>>>>>>>>>> maybe reading it in >>>>>>>>>>>>>>>>>>>>>>>> parts... >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>