Yeah, I think this is more like the 1.x and 2.x incompatibility. I've just opened the PR agains 1.20 which you can cherry-pick here [1].
[1] https://github.com/apache/flink/pull/26145 BR, G On Tue, Feb 11, 2025 at 4:19 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > Hi Gabor, > > So, a bit of progress, > > I managed to compile our stuff against your 2.1-SNAPSHOT (with a bit of > chopping around deprecated/changed and removed APIs - that wasn't too bad), > but that failed to read the state I was using before (that was generated > with a Flink 1.20). This is the stack trace I get. I suspect this has more > to do with state compatibility between 1.20 and 2.1... I was surprised the > error is against a timer state. The end of the stack trace is below. > > I will try to apply your patch/change/PR to our 1.20 build, but it's not a > simple git apply 😭😭😭. > > $ git apply --check ~/Downloads/26134.patch.txt > error: patch failed: > flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226 > error: > flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java: > patch does not apply > error: patch failed: > flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24 > error: > flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java: > patch does not apply > error: > flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java: > No such file or directory > error: > flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java: > No such file or directory > error: patch failed: > flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22 > error: > flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java: > patch does not apply > error: > flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java: > No such file or directory > error: > flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java: > No such file or directory > error: > flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java: > No such file or directory > error: > flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java: > No such file or directory > > > stack trace if relevant... > > java.util.concurrent.CompletionException: java.lang.RuntimeException: > Failed to fetch next result > at > com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88) > ~[classes/:?] > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) > ~[?:?] > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760) > ~[?:?] > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) > ~[?:?] > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) > ~[?:?] > at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) > ~[?:?] > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) > ~[?:?] > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) > ~[?:?] > Caused by: java.lang.RuntimeException: Failed to fetch next result > ... many more > Caused by: java.lang.RuntimeException: Failed to fetch next result > ... many more > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > ... 7 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > ... many more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed > by NoRestartBackoffTimeStrategy > ... many more > Caused by: java.io.IOException: Failed to restore timer state > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > Caused by: java.lang.reflect.InvocationTargetException > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:?] > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) > ~[?:?] > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:?] > at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) > ~[?:?] > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) > ~[?:?] > at > com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) > ~[classes/:?] > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > Caused by: java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28) > ~[classes/:?] > at > com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49) > ~[classes/:?] > at > com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88) > ~[classes/:?] > at > com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52) > ~[classes/:?] > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:?] > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) > ~[?:?] > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:?] > at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) > ~[?:?] > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) > ~[?:?] > at > com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) > ~[classes/:?] > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > Caused by: org.apache.flink.util.StateMigrationException: The new state > serializer > (org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a) must > not be incompatible with the old state serializer > (org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44). > at > org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858) > ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763) > ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020) > ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007) > ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28) > ~[classes/:?] > at > com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49) > ~[classes/:?] > at > com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88) > ~[classes/:?] > at > com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52) > ~[classes/:?] > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) ~[?:?] > at > java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) > ~[?:?] > at > java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:?] > at > java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) > ~[?:?] > at > java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) > ~[?:?] > at > com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) > ~[classes/:?] > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) > ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) > ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) > ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] > > > > On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > >> Hi Gabor, >> >> Trying to but I struggle to compile my stuff against your Flink build... >> tried to apply your PR as a patch on my 1.20 modified fork and that didn't >> go well either. It will take time to untangle. >> >> Will keep you updated if I make progress >> >> JM >> >> >> >> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Hi Jean-Marc, >>> >>> FYI, I've just opened this [1] PR to address the issue in a clean way. >>> May I ask you to test it on your side? >>> >>> [1] https://github.com/apache/flink/pull/26134 >>> >>> BR, >>> G >>> >>> >>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Just a little update on this. We've made our first POC with the >>>> redesigned approach and the numbers are promising :) >>>> It still requires huge efforts in development/correctness/performance >>>> perspective but seems like we have something in the pocket. >>>> >>>> Test data: 256Mb state file with a single operator and 2 value states >>>> - Old execution time: 25M27.126737S >>>> - New execution time: 1M19.602042S >>>> In short: ~95% performance gain. >>>> >>>> G >>>> >>>> >>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >>>> wrote: >>>> >>>>> In short, when you don't care about >>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the safe >>>>> side. >>>>> >>>>> G >>>>> >>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>> wrote: >>>>> >>>>>> I am still hoping that I am still good. I just read the savepoint to >>>>>> extract information (parallelism 1, and only 1 task manager) . I also >>>>>> know >>>>>> it has been created by a job using a HashMap backend. And I do not care >>>>>> about duplicates. >>>>>> >>>>>> I should still be good, right? from what I saw I never read any >>>>>> duplicate keys. >>>>>> >>>>>> Thanks >>>>>> >>>>>> JM >>>>>> >>>>>> >>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> Hi Guys, >>>>>>> >>>>>>> We've just had an in-depth analysis and we think that removing that >>>>>>> particular line causes correctness issues under some circumstances. >>>>>>> >>>>>>> Namely key duplicates can happen when multiple column families are >>>>>>> processed at the same time. Not need to mention that it would cause >>>>>>> multiple >>>>>>> `readKey` calls which ends up in just wrong calculation logic (for >>>>>>> example in simple sum calculation). >>>>>>> >>>>>>> We've a vision how this can be solved in a clean way but it will >>>>>>> take some time. >>>>>>> >>>>>>> > Are there any plans on a migration guide or something for users to >>>>>>> adapt their QS observers (beyond the current docs)? >>>>>>> >>>>>>> The gap between the two approaches are quite huge and considering >>>>>>> the actual bugs and improvement possibilities in the state processor API >>>>>>> I would say this can come later on at least on my plate. When you >>>>>>> see the gaps and you know how to fill them feel free to contribute and >>>>>>> we can shepherd the PRs. >>>>>>> >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara < >>>>>>> salcantara...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks both for your work on this! >>>>>>>> >>>>>>>> On a related note, since Queryable State (QS) is going away soon, >>>>>>>> streamlining the State Processor API as much as possible makes a lot of >>>>>>>> sense. >>>>>>>> >>>>>>>> Are there any plans on a migration guide or something for users to >>>>>>>> adapt their QS observers (beyond the current docs)? >>>>>>>> (State-)Observability-wise Flink has some room for improvement I would >>>>>>>> say. >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Salva >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi < >>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Jean-Marc, >>>>>>>>> >>>>>>>>> Thanks for your time investment and to share the numbers, it's >>>>>>>>> super helpful. >>>>>>>>> Ping me any time when you have further info to share. >>>>>>>>> >>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not terrible. >>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need to go >>>>>>>>> beyond... >>>>>>>>> >>>>>>>>> Since we measure similar numbers with the unpatched Flink plus >>>>>>>>> this has been reported this by several users, >>>>>>>>> we must make changes in this area. It's still a question whether >>>>>>>>> the tested patch is the right approach >>>>>>>>> but at least we've touched the root cause. >>>>>>>>> >>>>>>>>> The next step on my side is to have a deep dive and understand all >>>>>>>>> the aspects why remove is there, >>>>>>>>> how the remove elimination would effect existing use-cases and >>>>>>>>> consider all other possibilities. >>>>>>>>> >>>>>>>>> BR, >>>>>>>>> G >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Gabor, >>>>>>>>>> >>>>>>>>>> I finally got to run that change through. I have a 6Gb savepoint >>>>>>>>>> I read and parse for reference. >>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires less >>>>>>>>>> than 2Gb) >>>>>>>>>> - RockDb without the patch wasn't even halfway through after 12 >>>>>>>>>> hours.... (I gave up) >>>>>>>>>> >>>>>>>>>> I don't think I have any duplicates because the application that >>>>>>>>>> generates the savepoint is using HashMap, so my scenario may not be >>>>>>>>>> representative. I am using IBM Seremu Java 17 (openJ9-0.46). >>>>>>>>>> >>>>>>>>>> That was run on a VM on my laptop, so not exactly a >>>>>>>>>> controlled environment. but I think it's conclusive enough. I will >>>>>>>>>> need to >>>>>>>>>> run further tests but I think we will patch our Flink. using a system >>>>>>>>>> property to configure it. >>>>>>>>>> >>>>>>>>>> Hope this help >>>>>>>>>> >>>>>>>>>> JM >>>>>>>>>> >>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi < >>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Just to give an update. I've applied the mentioned patch and the >>>>>>>>>>> execution time drastically decreased (the gain is 98.9%): >>>>>>>>>>> >>>>>>>>>>> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>>>>> [] - Execution time: PT14.690426S >>>>>>>>>>> >>>>>>>>>>> I need to double check what that would mean to correctness and >>>>>>>>>>> all other aspects. >>>>>>>>>>> >>>>>>>>>>> G >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Please report back on how the patch behaves including any side >>>>>>>>>>>> effects. >>>>>>>>>>>> >>>>>>>>>>>> Now I'm in testing the state reading with processor API vs the >>>>>>>>>>>> mentioned job where we control the keys. >>>>>>>>>>>> The difference is extreme, especially because the numbers are >>>>>>>>>>>> coming from reading ~40Mb state file😅 >>>>>>>>>>>> >>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>>>>>> [] - Execution time: PT22M24.612954S >>>>>>>>>>>> ... >>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO o.a.f.e.s.r.FlinkTestStateReaderJob >>>>>>>>>>>> [] - Execution time: PT6.930659S >>>>>>>>>>>> >>>>>>>>>>>> Don't need to mention that the bigger is the processor API. >>>>>>>>>>>> >>>>>>>>>>>> G >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin < >>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> That's a good idea, Sadly I have no control over the keys.... >>>>>>>>>>>>> >>>>>>>>>>>>> I was going to patch Flink with the suggestion in FLINK-37109 >>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first to >>>>>>>>>>>>> see how that goes. If that brings RockDb performance in an >>>>>>>>>>>>> acceptable range >>>>>>>>>>>>> for us we might go that way. I really like the light memory >>>>>>>>>>>>> consumption of >>>>>>>>>>>>> RockDb for that kind of side job. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> JM >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>>>>>>>> set the operator UID on a custom written operator, which >>>>>>>>>>>>>> opens the state info for you. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The only drawback is that you must know the keyBy range... >>>>>>>>>>>>>> this can be problematic but if you can do it it's a win :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> G >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin < >>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the savepoint >>>>>>>>>>>>>>> in pages, instead of as a single blob up front which I think is >>>>>>>>>>>>>>> what the >>>>>>>>>>>>>>> hashmap does... we just want to be called for each entry and >>>>>>>>>>>>>>> extract the >>>>>>>>>>>>>>> bit we want in that scenario. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Never mind >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for >>>>>>>>>>>>>>> nothing. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is not >>>>>>>>>>>>>>>> reaching the performance criteria which it should be. There is >>>>>>>>>>>>>>>> an open >>>>>>>>>>>>>>>> issue for it [1]. >>>>>>>>>>>>>>>> The hashmap based approach was and is always expecting more >>>>>>>>>>>>>>>> memory. So if the memory footprint is a hard requirement then >>>>>>>>>>>>>>>> RocksDB is >>>>>>>>>>>>>>>> the only way now. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make it >>>>>>>>>>>>>>>> better. All I can promise that I'm now measuring performance >>>>>>>>>>>>>>>> of the RocksDB >>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't know >>>>>>>>>>>>>>>> what exactly causes the slowness the new Frocksdb-8.10.0 can >>>>>>>>>>>>>>>> be also an >>>>>>>>>>>>>>>> imrpvement. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> BR, >>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint and >>>>>>>>>>>>>>>>> minimise the memory consumption. We just need to transform it >>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>> something else for investigation. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, and >>>>>>>>>>>>>>>>> is spread over 6 task slots in 6 pods (under k8s). Savepoints >>>>>>>>>>>>>>>>> are saved on >>>>>>>>>>>>>>>>> S3. A savepoint can be 4-5Gb or more. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>>>>>>>> SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>>>>>> HashMapStateBackend()); >>>>>>>>>>>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>>>> mainOperatorState = >>>>>>>>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>>>>>>>> /// extract what we need here >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We tried two approaches: >>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. >>>>>>>>>>>>>>>>> That works and is low on memory usage, but is very very slow. >>>>>>>>>>>>>>>>> We noticed >>>>>>>>>>>>>>>>> the iterator is available very early on, but it is slow... >>>>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap >>>>>>>>>>>>>>>>> backend. That is very fast, as expected. However the iterator >>>>>>>>>>>>>>>>> apparently >>>>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded in the >>>>>>>>>>>>>>>>> HashMap, so >>>>>>>>>>>>>>>>> heavy memory consumption. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it so >>>>>>>>>>>>>>>>> that it does not consume all the memory ? or maybe reading it >>>>>>>>>>>>>>>>> in parts... >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>