Hi Jean-Marc, FYI the changes are merged into master and release-1.20 so feel free to use it.
In terms of state migration from 1.x to 2.x this can be challenging for the whole community so if you have some info please share. G On Wed, Feb 12, 2025 at 2:40 PM Jean-Marc Paulin <jm.pau...@gmail.com> wrote: > Hi Gabor > > Glad we helped, but thanks to you for working on this. The performance > improvement you made is fantastic. > > I think on our side we will wait for the final official patch, and then > decide if we move up to 1.20.1 (or whatever includes it) or just apply the > patch on what we have. Our Flink is heavily customized on top of 1.20 for > HA configuration and priority preferences in the TwoInputStreamOperator. > > As part of this we learnt we will have to find a good way to handle state > migration when moving up to Flink 2.x, so that was a good learning > exercise for us. > > Kind regards > > JM > > > > On Wed, Feb 12, 2025 at 12:57 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Hi Jean-Marc, >> >> Thanks for your efforts! We've done quite extensive tests inside and they >> are all passing. Good to see that the number of keys matches on your side >> too. >> Regarding key ordering, the state processor API is not giving any >> guarantees in terms of ordering. >> >> All in all I'm confident with the solution so green light from my side. >> I'm intended to open the PR for some time to have everybody the possibility >> to make it better or raise any kind of concerns. >> >> BR, >> G >> >> >> On Wed, Feb 12, 2025 at 1:21 PM Jean-Marc Paulin <jm.pau...@gmail.com> >> wrote: >> >>> Hi gabor >>> >>> I see all my 1,331,301 keys with your patch, which is exactly what I am >>> expecting. If you have more specific concerns I suppose I can instrument my >>> code further. I have no expectation regarding the key ordering. >>> >>> JM >>> >>> >>> On Wed, Feb 12, 2025 at 11:29 AM Gabor Somogyi < >>> gabor.g.somo...@gmail.com> wrote: >>> >>>> I think the hashmap vs patched RocksDB makes sense, at least I'm >>>> measuring similar number with relatively small states. >>>> RockDb commenting out the remove() is a bit surprisingly high but since >>>> it's causing correctness issues under some circumstances I would abandon >>>> that. >>>> >>>> > I probably need more time to ensure we definitely see all the keys, >>>> but that looks very good. >>>> >>>> Yeah, correctness is key here so waiting on your results. >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Wed, Feb 12, 2025 at 11:56 AM Jean-Marc Paulin <j...@paulin.co.uk> >>>> wrote: >>>> >>>>> Hi Gabor, >>>>> >>>>> I applied your 1.20 patch, and I got some very good numbers from it... >>>>> so for my 5GB savepoint, I made sure I skip all my code overhead to get >>>>> the >>>>> raw number, and I can read it in >>>>> - HashMap : 4 minutes >>>>> - RockDb with your patch: ~19 minutes >>>>> - RockDb commenting out the remove(): 49 minutes >>>>> >>>>> I am not sure these numbers make sense, and this is on a VM on my >>>>> laptop, so not exactly a good performance testing environment, but the >>>>> numbers I see are pretty good. I probably need more time to ensure we >>>>> definitely see all the keys, but that looks very good. >>>>> >>>>> Hope this help >>>>> >>>>> JM >>>>> >>>>> >>>>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: ( run 1/2) >>>>> ``` >>>>> 2025-02-12 08:22:59,282 INFO LifecycleStateExplorerTest - Starting >>>>> now >>>>> 2025-02-12 08:22:59,283 INFO c.i.a.l.s.LifecycleStateExplorer - >>>>> Reading state from /tmp/mock_savepoint16385495638880357651. >>>>> 2025-02-12 08:41:29,066 INFO LifecycleStateExplorerTest - Completed >>>>> now >>>>> 2025-02-12 08:41:29,066 INFO LifecycleStateExplorerTest - Duration: >>>>> PT18M29.783388324S ms >>>>> ``` >>>>> >>>>> #### using our flink 1.20 build, that removes the >>>>> keysAndNamespaces.remove(); line >>>>> ``` >>>>> 2025-02-12 09:03:39,018 INFO LifecycleStateExplorerTest - Starting >>>>> now >>>>> 2025-02-12 09:03:39,024 INFO c.i.a.l.s.LifecycleStateExplorer - >>>>> Reading state from /tmp/mock_savepoint10990862094634162213. >>>>> 2025-02-12 09:52:51,811 INFO LifecycleStateExplorerTest - Completed >>>>> now >>>>> 2025-02-12 09:52:51,813 INFO LifecycleStateExplorerTest - Duration: >>>>> PT49M12.788979538S ms >>>>> ``` >>>>> >>>>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: (run 2/2) >>>>> ``` >>>>> 2025-02-12 10:12:26,453 INFO LifecycleStateExplorerTest - Starting >>>>> now >>>>> 2025-02-12 10:12:26,458 INFO c.i.a.l.s.LifecycleStateExplorer - >>>>> Reading state from /tmp/mock_savepoint7967784813864743408. >>>>> 2025-02-12 10:32:20,215 INFO LifecycleStateExplorerTest - Completed >>>>> now >>>>> 2025-02-12 10:32:20,215 INFO LifecycleStateExplorerTest - Duration: >>>>> PT19M53.757274969S ms >>>>> ``` >>>>> >>>>> #### using HashMapMemoryBackEnd (for reference) >>>>> ``` >>>>> 2025-02-12 10:39:03,618 INFO LifecycleStateExplorerTest - Starting >>>>> now >>>>> 2025-02-12 10:39:03,622 INFO c.i.a.l.s.LifecycleStateExplorer - >>>>> Reading state from /tmp/mock_savepoint14340081990006677909. >>>>> 2025-02-12 10:43:16,454 INFO LifecycleStateExplorerTest - Completed >>>>> now >>>>> 2025-02-12 10:43:16,457 INFO LifecycleStateExplorerTest - Duration: >>>>> PT4M12.832810524S ms >>>>> ``` >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Feb 11, 2025 at 3:39 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> Yeah, I think this is more like the 1.x and 2.x incompatibility. >>>>>> I've just opened the PR agains 1.20 which you can cherry-pick here >>>>>> [1]. >>>>>> >>>>>> [1] https://github.com/apache/flink/pull/26145 >>>>>> >>>>>> BR, >>>>>> G >>>>>> >>>>>> >>>>>> On Tue, Feb 11, 2025 at 4:19 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>> wrote: >>>>>> >>>>>>> Hi Gabor, >>>>>>> >>>>>>> So, a bit of progress, >>>>>>> >>>>>>> I managed to compile our stuff against your 2.1-SNAPSHOT (with a bit >>>>>>> of chopping around deprecated/changed and removed APIs - that wasn't too >>>>>>> bad), but that failed to read the state I was using before (that was >>>>>>> generated with a Flink 1.20). This is the stack trace I get. I suspect >>>>>>> this >>>>>>> has more to do with state compatibility between 1.20 and 2.1... I was >>>>>>> surprised the error is against a timer state. The end of the stack >>>>>>> trace is >>>>>>> below. >>>>>>> >>>>>>> I will try to apply your patch/change/PR to our 1.20 build, but it's >>>>>>> not a simple git apply 😭😭😭. >>>>>>> >>>>>>> $ git apply --check ~/Downloads/26134.patch.txt >>>>>>> error: patch failed: >>>>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226 >>>>>>> error: >>>>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java: >>>>>>> patch does not apply >>>>>>> error: patch failed: >>>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24 >>>>>>> error: >>>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java: >>>>>>> patch does not apply >>>>>>> error: >>>>>>> flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java: >>>>>>> No such file or directory >>>>>>> error: >>>>>>> flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java: >>>>>>> No such file or directory >>>>>>> error: patch failed: >>>>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22 >>>>>>> error: >>>>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java: >>>>>>> patch does not apply >>>>>>> error: >>>>>>> flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java: >>>>>>> No such file or directory >>>>>>> error: >>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java: >>>>>>> No such file or directory >>>>>>> error: >>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java: >>>>>>> No such file or directory >>>>>>> error: >>>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java: >>>>>>> No such file or directory >>>>>>> >>>>>>> >>>>>>> stack trace if relevant... >>>>>>> >>>>>>> java.util.concurrent.CompletionException: >>>>>>> java.lang.RuntimeException: Failed to fetch next result >>>>>>> at >>>>>>> com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) >>>>>>> ~[?:?] >>>>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result >>>>>>> ... many more >>>>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result >>>>>>> ... many more >>>>>>> Caused by: java.util.concurrent.ExecutionException: >>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>>>>> failed. >>>>>>> ... 7 more >>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>>> Job execution failed. >>>>>>> ... many more >>>>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is >>>>>>> suppressed by NoRestartBackoffTimeStrategy >>>>>>> ... many more >>>>>>> Caused by: java.io.IOException: Failed to restore timer state >>>>>>> at >>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> Caused by: java.lang.reflect.InvocationTargetException >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>>>> Method) ~[?:?] >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> Caused by: java.lang.RuntimeException: Error while getting state >>>>>>> at >>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>>>> Method) ~[?:?] >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> Caused by: org.apache.flink.util.StateMigrationException: The new >>>>>>> state serializer >>>>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a) >>>>>>> must not be incompatible with the old state serializer >>>>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44). >>>>>>> at >>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858) >>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763) >>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020) >>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007) >>>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>>>> Method) ~[?:?] >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>>>>> ~[?:?] >>>>>>> at >>>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>>>>> ~[classes/:?] >>>>>>> at >>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> at >>>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Gabor, >>>>>>>> >>>>>>>> Trying to but I struggle to compile my stuff against your Flink >>>>>>>> build... tried to apply your PR as a patch on my 1.20 modified fork and >>>>>>>> that didn't go well either. It will take time to untangle. >>>>>>>> >>>>>>>> Will keep you updated if I make progress >>>>>>>> >>>>>>>> JM >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi < >>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Jean-Marc, >>>>>>>>> >>>>>>>>> FYI, I've just opened this [1] PR to address the issue in a clean >>>>>>>>> way. >>>>>>>>> May I ask you to test it on your side? >>>>>>>>> >>>>>>>>> [1] https://github.com/apache/flink/pull/26134 >>>>>>>>> >>>>>>>>> BR, >>>>>>>>> G >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi < >>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Just a little update on this. We've made our first POC with the >>>>>>>>>> redesigned approach and the numbers are promising :) >>>>>>>>>> It still requires huge efforts in >>>>>>>>>> development/correctness/performance perspective but seems like we >>>>>>>>>> have >>>>>>>>>> something in the pocket. >>>>>>>>>> >>>>>>>>>> Test data: 256Mb state file with a single operator and 2 value >>>>>>>>>> states >>>>>>>>>> - Old execution time: 25M27.126737S >>>>>>>>>> - New execution time: 1M19.602042S >>>>>>>>>> In short: ~95% performance gain. >>>>>>>>>> >>>>>>>>>> G >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi < >>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> In short, when you don't care about >>>>>>>>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the >>>>>>>>>>> safe >>>>>>>>>>> side. >>>>>>>>>>> >>>>>>>>>>> G >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I am still hoping that I am still good. I just read the >>>>>>>>>>>> savepoint to extract information (parallelism 1, and only 1 task >>>>>>>>>>>> manager) . >>>>>>>>>>>> I also know it has been created by a job using a HashMap backend. >>>>>>>>>>>> And I do >>>>>>>>>>>> not care about duplicates. >>>>>>>>>>>> >>>>>>>>>>>> I should still be good, right? from what I saw I never read any >>>>>>>>>>>> duplicate keys. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> JM >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi < >>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Guys, >>>>>>>>>>>>> >>>>>>>>>>>>> We've just had an in-depth analysis and we think that removing >>>>>>>>>>>>> that particular line causes correctness issues under some >>>>>>>>>>>>> circumstances. >>>>>>>>>>>>> >>>>>>>>>>>>> Namely key duplicates can happen when multiple column families >>>>>>>>>>>>> are processed at the same time. Not need to mention that it would >>>>>>>>>>>>> cause >>>>>>>>>>>>> multiple >>>>>>>>>>>>> `readKey` calls which ends up in just wrong calculation logic >>>>>>>>>>>>> (for example in simple sum calculation). >>>>>>>>>>>>> >>>>>>>>>>>>> We've a vision how this can be solved in a clean way but it >>>>>>>>>>>>> will take some time. >>>>>>>>>>>>> >>>>>>>>>>>>> > Are there any plans on a migration guide or something for >>>>>>>>>>>>> users to adapt their QS observers (beyond the current docs)? >>>>>>>>>>>>> >>>>>>>>>>>>> The gap between the two approaches are quite huge and >>>>>>>>>>>>> considering the actual bugs and improvement possibilities in the >>>>>>>>>>>>> state >>>>>>>>>>>>> processor API >>>>>>>>>>>>> I would say this can come later on at least on my plate. When >>>>>>>>>>>>> you see the gaps and you know how to fill them feel free to >>>>>>>>>>>>> contribute and >>>>>>>>>>>>> we can shepherd the PRs. >>>>>>>>>>>>> >>>>>>>>>>>>> G >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara < >>>>>>>>>>>>> salcantara...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks both for your work on this! >>>>>>>>>>>>>> >>>>>>>>>>>>>> On a related note, since Queryable State (QS) is going away >>>>>>>>>>>>>> soon, streamlining the State Processor API as much as possible >>>>>>>>>>>>>> makes a lot >>>>>>>>>>>>>> of sense. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Are there any plans on a migration guide or something for >>>>>>>>>>>>>> users to adapt their QS observers (beyond the current docs)? >>>>>>>>>>>>>> (State-)Observability-wise Flink has some room for improvement I >>>>>>>>>>>>>> would say. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Salva >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi < >>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your time investment and to share the numbers, >>>>>>>>>>>>>>> it's super helpful. >>>>>>>>>>>>>>> Ping me any time when you have further info to share. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not >>>>>>>>>>>>>>> terrible. >>>>>>>>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need >>>>>>>>>>>>>>> to go beyond... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Since we measure similar numbers with the unpatched Flink >>>>>>>>>>>>>>> plus this has been reported this by several users, >>>>>>>>>>>>>>> we must make changes in this area. It's still a question >>>>>>>>>>>>>>> whether the tested patch is the right approach >>>>>>>>>>>>>>> but at least we've touched the root cause. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The next step on my side is to have a deep dive and >>>>>>>>>>>>>>> understand all the aspects why remove is there, >>>>>>>>>>>>>>> how the remove elimination would effect existing use-cases >>>>>>>>>>>>>>> and consider all other possibilities. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> BR, >>>>>>>>>>>>>>> G >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin < >>>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I finally got to run that change through. I have a 6Gb >>>>>>>>>>>>>>>> savepoint I read and parse for reference. >>>>>>>>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >>>>>>>>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and >>>>>>>>>>>>>>>> requires less than 2Gb) >>>>>>>>>>>>>>>> - RockDb without the patch wasn't even halfway through >>>>>>>>>>>>>>>> after 12 hours.... (I gave up) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I don't think I have any duplicates because the >>>>>>>>>>>>>>>> application that generates the savepoint is using HashMap, so >>>>>>>>>>>>>>>> my scenario >>>>>>>>>>>>>>>> may not be representative. I am using IBM Seremu Java 17 >>>>>>>>>>>>>>>> (openJ9-0.46). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> That was run on a VM on my laptop, so not exactly a >>>>>>>>>>>>>>>> controlled environment. but I think it's conclusive enough. I >>>>>>>>>>>>>>>> will need to >>>>>>>>>>>>>>>> run further tests but I think we will patch our Flink. using a >>>>>>>>>>>>>>>> system >>>>>>>>>>>>>>>> property to configure it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hope this help >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi < >>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Just to give an update. I've applied the mentioned patch >>>>>>>>>>>>>>>>> and the execution time drastically decreased (the gain is >>>>>>>>>>>>>>>>> 98.9%): >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2025-02-04 16:52:54,448 INFO >>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReader [] - >>>>>>>>>>>>>>>>> Execution time: PT14.690426S >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I need to double check what that would mean to correctness >>>>>>>>>>>>>>>>> and all other aspects. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Please report back on how the patch behaves including any >>>>>>>>>>>>>>>>>> side effects. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Now I'm in testing the state reading with processor API >>>>>>>>>>>>>>>>>> vs the mentioned job where we control the keys. >>>>>>>>>>>>>>>>>> The difference is extreme, especially because the numbers >>>>>>>>>>>>>>>>>> are coming from reading ~40Mb state file😅 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO >>>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReader [] - >>>>>>>>>>>>>>>>>> Execution time: PT22M24.612954S >>>>>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO >>>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReaderJob [] - >>>>>>>>>>>>>>>>>> Execution time: PT6.930659S >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Don't need to mention that the bigger is the processor >>>>>>>>>>>>>>>>>> API. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin < >>>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> That's a good idea, Sadly I have no control over the >>>>>>>>>>>>>>>>>>> keys.... >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I was going to patch Flink with the suggestion in >>>>>>>>>>>>>>>>>>> FLINK-37109 >>>>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first >>>>>>>>>>>>>>>>>>> to see how that goes. If that brings RockDb performance in >>>>>>>>>>>>>>>>>>> an acceptable >>>>>>>>>>>>>>>>>>> range for us we might go that way. I really like the light >>>>>>>>>>>>>>>>>>> memory >>>>>>>>>>>>>>>>>>> consumption of RockDb for that kind of side job. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>>>>>>>>>>>>>> set the operator UID on a custom written operator, >>>>>>>>>>>>>>>>>>>> which opens the state info for you. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The only drawback is that you must know the keyBy >>>>>>>>>>>>>>>>>>>> range... this can be problematic but if you can do it it's >>>>>>>>>>>>>>>>>>>> a win :) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin < >>>>>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the >>>>>>>>>>>>>>>>>>>>> savepoint in pages, instead of as a single blob up front >>>>>>>>>>>>>>>>>>>>> which I think is >>>>>>>>>>>>>>>>>>>>> what the hashmap does... we just want to be called for >>>>>>>>>>>>>>>>>>>>> each entry and >>>>>>>>>>>>>>>>>>>>> extract the bit we want in that scenario. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Never mind >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting >>>>>>>>>>>>>>>>>>>>> for nothing. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is >>>>>>>>>>>>>>>>>>>>>> not reaching the performance criteria which it should >>>>>>>>>>>>>>>>>>>>>> be. There is an open >>>>>>>>>>>>>>>>>>>>>> issue for it [1]. >>>>>>>>>>>>>>>>>>>>>> The hashmap based approach was and is always >>>>>>>>>>>>>>>>>>>>>> expecting more memory. So if the memory footprint is a >>>>>>>>>>>>>>>>>>>>>> hard requirement >>>>>>>>>>>>>>>>>>>>>> then RocksDB is the only way now. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to >>>>>>>>>>>>>>>>>>>>>> make it better. All I can promise that I'm now measuring >>>>>>>>>>>>>>>>>>>>>> performance of the >>>>>>>>>>>>>>>>>>>>>> RocksDB approach >>>>>>>>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we >>>>>>>>>>>>>>>>>>>>>> don't know what exactly causes the slowness the new >>>>>>>>>>>>>>>>>>>>>> Frocksdb-8.10.0 can be >>>>>>>>>>>>>>>>>>>>>> also an imrpvement. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> BR, >>>>>>>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint >>>>>>>>>>>>>>>>>>>>>>> and minimise the memory consumption. We just need to >>>>>>>>>>>>>>>>>>>>>>> transform it into >>>>>>>>>>>>>>>>>>>>>>> something else for investigation. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap >>>>>>>>>>>>>>>>>>>>>>> backend, and is spread over 6 task slots in 6 pods >>>>>>>>>>>>>>>>>>>>>>> (under k8s). Savepoints >>>>>>>>>>>>>>>>>>>>>>> are saved on S3. A savepoint can be 4-5Gb or more. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>>>>>>>>>>>>>> SavepointReader.read(env, savepointLocation, >>>>>>>>>>>>>>>>>>>>>>> new HashMapStateBackend()); >>>>>>>>>>>>>>>>>>>>>>> // SavepointReader.read(env, savepointLocation, >>>>>>>>>>>>>>>>>>>>>>> new EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>>>>>>>>>> mainOperatorState = >>>>>>>>>>>>>>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>>>>>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>>>>>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>>>>>>>>>>>>>> /// extract what we need here >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> We tried two approaches: >>>>>>>>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb >>>>>>>>>>>>>>>>>>>>>>> backend. That works and is low on memory usage, but is >>>>>>>>>>>>>>>>>>>>>>> very very slow. We >>>>>>>>>>>>>>>>>>>>>>> noticed the iterator is available very early on, but it >>>>>>>>>>>>>>>>>>>>>>> is slow... >>>>>>>>>>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap >>>>>>>>>>>>>>>>>>>>>>> backend. That is very fast, as expected. However the >>>>>>>>>>>>>>>>>>>>>>> iterator apparently >>>>>>>>>>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded >>>>>>>>>>>>>>>>>>>>>>> in the HashMap, so >>>>>>>>>>>>>>>>>>>>>>> heavy memory consumption. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune >>>>>>>>>>>>>>>>>>>>>>> it so that it does not consume all the memory ? or >>>>>>>>>>>>>>>>>>>>>>> maybe reading it in >>>>>>>>>>>>>>>>>>>>>>> parts... >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>