Hi gabor I see all my 1,331,301 keys with your patch, which is exactly what I am expecting. If you have more specific concerns I suppose I can instrument my code further. I have no expectation regarding the key ordering.
JM On Wed, Feb 12, 2025 at 11:29 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > I think the hashmap vs patched RocksDB makes sense, at least I'm measuring > similar number with relatively small states. > RockDb commenting out the remove() is a bit surprisingly high but since > it's causing correctness issues under some circumstances I would abandon > that. > > > I probably need more time to ensure we definitely see all the keys, but > that looks very good. > > Yeah, correctness is key here so waiting on your results. > > BR, > G > > > On Wed, Feb 12, 2025 at 11:56 AM Jean-Marc Paulin <j...@paulin.co.uk> wrote: > >> Hi Gabor, >> >> I applied your 1.20 patch, and I got some very good numbers from it... so >> for my 5GB savepoint, I made sure I skip all my code overhead to get the >> raw number, and I can read it in >> - HashMap : 4 minutes >> - RockDb with your patch: ~19 minutes >> - RockDb commenting out the remove(): 49 minutes >> >> I am not sure these numbers make sense, and this is on a VM on my laptop, >> so not exactly a good performance testing environment, but the numbers I >> see are pretty good. I probably need more time to ensure we definitely see >> all the keys, but that looks very good. >> >> Hope this help >> >> JM >> >> >> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: ( run 1/2) >> ``` >> 2025-02-12 08:22:59,282 INFO LifecycleStateExplorerTest - Starting now >> 2025-02-12 08:22:59,283 INFO c.i.a.l.s.LifecycleStateExplorer - Reading >> state from /tmp/mock_savepoint16385495638880357651. >> 2025-02-12 08:41:29,066 INFO LifecycleStateExplorerTest - Completed now >> 2025-02-12 08:41:29,066 INFO LifecycleStateExplorerTest - Duration: >> PT18M29.783388324S ms >> ``` >> >> #### using our flink 1.20 build, that removes the >> keysAndNamespaces.remove(); line >> ``` >> 2025-02-12 09:03:39,018 INFO LifecycleStateExplorerTest - Starting now >> 2025-02-12 09:03:39,024 INFO c.i.a.l.s.LifecycleStateExplorer - Reading >> state from /tmp/mock_savepoint10990862094634162213. >> 2025-02-12 09:52:51,811 INFO LifecycleStateExplorerTest - Completed now >> 2025-02-12 09:52:51,813 INFO LifecycleStateExplorerTest - Duration: >> PT49M12.788979538S ms >> ``` >> >> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: (run 2/2) >> ``` >> 2025-02-12 10:12:26,453 INFO LifecycleStateExplorerTest - Starting now >> 2025-02-12 10:12:26,458 INFO c.i.a.l.s.LifecycleStateExplorer - Reading >> state from /tmp/mock_savepoint7967784813864743408. >> 2025-02-12 10:32:20,215 INFO LifecycleStateExplorerTest - Completed now >> 2025-02-12 10:32:20,215 INFO LifecycleStateExplorerTest - Duration: >> PT19M53.757274969S ms >> ``` >> >> #### using HashMapMemoryBackEnd (for reference) >> ``` >> 2025-02-12 10:39:03,618 INFO LifecycleStateExplorerTest - Starting now >> 2025-02-12 10:39:03,622 INFO c.i.a.l.s.LifecycleStateExplorer - Reading >> state from /tmp/mock_savepoint14340081990006677909. >> 2025-02-12 10:43:16,454 INFO LifecycleStateExplorerTest - Completed now >> 2025-02-12 10:43:16,457 INFO LifecycleStateExplorerTest - Duration: >> PT4M12.832810524S ms >> ``` >> >> >> >> >> >> On Tue, Feb 11, 2025 at 3:39 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> Yeah, I think this is more like the 1.x and 2.x incompatibility. >>> I've just opened the PR agains 1.20 which you can cherry-pick here [1]. >>> >>> [1] https://github.com/apache/flink/pull/26145 >>> >>> BR, >>> G >>> >>> >>> On Tue, Feb 11, 2025 at 4:19 PM Jean-Marc Paulin <j...@paulin.co.uk> >>> wrote: >>> >>>> Hi Gabor, >>>> >>>> So, a bit of progress, >>>> >>>> I managed to compile our stuff against your 2.1-SNAPSHOT (with a bit of >>>> chopping around deprecated/changed and removed APIs - that wasn't too bad), >>>> but that failed to read the state I was using before (that was generated >>>> with a Flink 1.20). This is the stack trace I get. I suspect this has more >>>> to do with state compatibility between 1.20 and 2.1... I was surprised the >>>> error is against a timer state. The end of the stack trace is below. >>>> >>>> I will try to apply your patch/change/PR to our 1.20 build, but it's >>>> not a simple git apply 😭😭😭. >>>> >>>> $ git apply --check ~/Downloads/26134.patch.txt >>>> error: patch failed: >>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226 >>>> error: >>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java: >>>> patch does not apply >>>> error: patch failed: >>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24 >>>> error: >>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java: >>>> patch does not apply >>>> error: >>>> flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java: >>>> No such file or directory >>>> error: >>>> flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java: >>>> No such file or directory >>>> error: patch failed: >>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22 >>>> error: >>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java: >>>> patch does not apply >>>> error: >>>> flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java: >>>> No such file or directory >>>> error: >>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java: >>>> No such file or directory >>>> error: >>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java: >>>> No such file or directory >>>> error: >>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java: >>>> No such file or directory >>>> >>>> >>>> stack trace if relevant... >>>> >>>> java.util.concurrent.CompletionException: java.lang.RuntimeException: >>>> Failed to fetch next result >>>> at >>>> com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88) >>>> ~[classes/:?] >>>> at >>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) >>>> ~[?:?] >>>> at >>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760) >>>> ~[?:?] >>>> at >>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) >>>> ~[?:?] >>>> at >>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) >>>> ~[?:?] >>>> at >>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) >>>> ~[?:?] >>>> at >>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) >>>> ~[?:?] >>>> at >>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) >>>> ~[?:?] >>>> Caused by: java.lang.RuntimeException: Failed to fetch next result >>>> ... many more >>>> Caused by: java.lang.RuntimeException: Failed to fetch next result >>>> ... many more >>>> Caused by: java.util.concurrent.ExecutionException: >>>> org.apache.flink.runtime.client.JobExecutionException: Job execution >>>> failed. >>>> ... 7 more >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>> execution failed. >>>> ... many more >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is >>>> suppressed by NoRestartBackoffTimeStrategy >>>> ... many more >>>> Caused by: java.io.IOException: Failed to restore timer state >>>> at >>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> Caused by: java.lang.reflect.InvocationTargetException >>>> at >>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>> Method) ~[?:?] >>>> at >>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>> ~[?:?] >>>> at >>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>> ~[?:?] >>>> at >>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>> ~[?:?] >>>> at >>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>> ~[?:?] >>>> at >>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>> ~[classes/:?] >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> Caused by: java.lang.RuntimeException: Error while getting state >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28) >>>> ~[classes/:?] >>>> at >>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49) >>>> ~[classes/:?] >>>> at >>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88) >>>> ~[classes/:?] >>>> at >>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52) >>>> ~[classes/:?] >>>> at >>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>> Method) ~[?:?] >>>> at >>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>> ~[?:?] >>>> at >>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>> ~[?:?] >>>> at >>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>> ~[?:?] >>>> at >>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>> ~[?:?] >>>> at >>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>> ~[classes/:?] >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> Caused by: org.apache.flink.util.StateMigrationException: The new state >>>> serializer >>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a) >>>> must not be incompatible with the old state serializer >>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44). >>>> at >>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858) >>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763) >>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020) >>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007) >>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28) >>>> ~[classes/:?] >>>> at >>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49) >>>> ~[classes/:?] >>>> at >>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88) >>>> ~[classes/:?] >>>> at >>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52) >>>> ~[classes/:?] >>>> at >>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>> Method) ~[?:?] >>>> at >>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77) >>>> ~[?:?] >>>> at >>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>> ~[?:?] >>>> at >>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500) >>>> ~[?:?] >>>> at >>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481) >>>> ~[?:?] >>>> at >>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52) >>>> ~[classes/:?] >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) >>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66) >>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337) >>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT] >>>> >>>> >>>> >>>> On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>> wrote: >>>> >>>>> Hi Gabor, >>>>> >>>>> Trying to but I struggle to compile my stuff against your Flink >>>>> build... tried to apply your PR as a patch on my 1.20 modified fork and >>>>> that didn't go well either. It will take time to untangle. >>>>> >>>>> Will keep you updated if I make progress >>>>> >>>>> JM >>>>> >>>>> >>>>> >>>>> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> Hi Jean-Marc, >>>>>> >>>>>> FYI, I've just opened this [1] PR to address the issue in a clean way. >>>>>> May I ask you to test it on your side? >>>>>> >>>>>> [1] https://github.com/apache/flink/pull/26134 >>>>>> >>>>>> BR, >>>>>> G >>>>>> >>>>>> >>>>>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi < >>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>> >>>>>>> Just a little update on this. We've made our first POC with the >>>>>>> redesigned approach and the numbers are promising :) >>>>>>> It still requires huge efforts in >>>>>>> development/correctness/performance perspective but seems like we have >>>>>>> something in the pocket. >>>>>>> >>>>>>> Test data: 256Mb state file with a single operator and 2 value states >>>>>>> - Old execution time: 25M27.126737S >>>>>>> - New execution time: 1M19.602042S >>>>>>> In short: ~95% performance gain. >>>>>>> >>>>>>> G >>>>>>> >>>>>>> >>>>>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi < >>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>> >>>>>>>> In short, when you don't care about >>>>>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the safe >>>>>>>> side. >>>>>>>> >>>>>>>> G >>>>>>>> >>>>>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I am still hoping that I am still good. I just read the savepoint >>>>>>>>> to extract information (parallelism 1, and only 1 task manager) . I >>>>>>>>> also >>>>>>>>> know it has been created by a job using a HashMap backend. And I do >>>>>>>>> not >>>>>>>>> care about duplicates. >>>>>>>>> >>>>>>>>> I should still be good, right? from what I saw I never read any >>>>>>>>> duplicate keys. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> JM >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi < >>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Guys, >>>>>>>>>> >>>>>>>>>> We've just had an in-depth analysis and we think that removing >>>>>>>>>> that particular line causes correctness issues under some >>>>>>>>>> circumstances. >>>>>>>>>> >>>>>>>>>> Namely key duplicates can happen when multiple column families >>>>>>>>>> are processed at the same time. Not need to mention that it would >>>>>>>>>> cause >>>>>>>>>> multiple >>>>>>>>>> `readKey` calls which ends up in just wrong calculation logic >>>>>>>>>> (for example in simple sum calculation). >>>>>>>>>> >>>>>>>>>> We've a vision how this can be solved in a clean way but it will >>>>>>>>>> take some time. >>>>>>>>>> >>>>>>>>>> > Are there any plans on a migration guide or something for users >>>>>>>>>> to adapt their QS observers (beyond the current docs)? >>>>>>>>>> >>>>>>>>>> The gap between the two approaches are quite huge and considering >>>>>>>>>> the actual bugs and improvement possibilities in the state processor >>>>>>>>>> API >>>>>>>>>> I would say this can come later on at least on my plate. When you >>>>>>>>>> see the gaps and you know how to fill them feel free to contribute >>>>>>>>>> and >>>>>>>>>> we can shepherd the PRs. >>>>>>>>>> >>>>>>>>>> G >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara < >>>>>>>>>> salcantara...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks both for your work on this! >>>>>>>>>>> >>>>>>>>>>> On a related note, since Queryable State (QS) is going away >>>>>>>>>>> soon, streamlining the State Processor API as much as possible >>>>>>>>>>> makes a lot >>>>>>>>>>> of sense. >>>>>>>>>>> >>>>>>>>>>> Are there any plans on a migration guide or something for users >>>>>>>>>>> to adapt their QS observers (beyond the current docs)? >>>>>>>>>>> (State-)Observability-wise Flink has some room for improvement I >>>>>>>>>>> would say. >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> >>>>>>>>>>> Salva >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi < >>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your time investment and to share the numbers, it's >>>>>>>>>>>> super helpful. >>>>>>>>>>>> Ping me any time when you have further info to share. >>>>>>>>>>>> >>>>>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not >>>>>>>>>>>> terrible. >>>>>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need to >>>>>>>>>>>> go beyond... >>>>>>>>>>>> >>>>>>>>>>>> Since we measure similar numbers with the unpatched Flink plus >>>>>>>>>>>> this has been reported this by several users, >>>>>>>>>>>> we must make changes in this area. It's still a question >>>>>>>>>>>> whether the tested patch is the right approach >>>>>>>>>>>> but at least we've touched the root cause. >>>>>>>>>>>> >>>>>>>>>>>> The next step on my side is to have a deep dive and understand >>>>>>>>>>>> all the aspects why remove is there, >>>>>>>>>>>> how the remove elimination would effect existing use-cases and >>>>>>>>>>>> consider all other possibilities. >>>>>>>>>>>> >>>>>>>>>>>> BR, >>>>>>>>>>>> G >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin < >>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>>> >>>>>>>>>>>>> I finally got to run that change through. I have a 6Gb >>>>>>>>>>>>> savepoint I read and parse for reference. >>>>>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM) >>>>>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires >>>>>>>>>>>>> less than 2Gb) >>>>>>>>>>>>> - RockDb without the patch wasn't even halfway through after >>>>>>>>>>>>> 12 hours.... (I gave up) >>>>>>>>>>>>> >>>>>>>>>>>>> I don't think I have any duplicates because the >>>>>>>>>>>>> application that generates the savepoint is using HashMap, so my >>>>>>>>>>>>> scenario >>>>>>>>>>>>> may not be representative. I am using IBM Seremu Java 17 >>>>>>>>>>>>> (openJ9-0.46). >>>>>>>>>>>>> >>>>>>>>>>>>> That was run on a VM on my laptop, so not exactly a >>>>>>>>>>>>> controlled environment. but I think it's conclusive enough. I >>>>>>>>>>>>> will need to >>>>>>>>>>>>> run further tests but I think we will patch our Flink. using a >>>>>>>>>>>>> system >>>>>>>>>>>>> property to configure it. >>>>>>>>>>>>> >>>>>>>>>>>>> Hope this help >>>>>>>>>>>>> >>>>>>>>>>>>> JM >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi < >>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Just to give an update. I've applied the mentioned patch and >>>>>>>>>>>>>> the execution time drastically decreased (the gain is 98.9%): >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2025-02-04 16:52:54,448 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>>>>>>>> [] - Execution time: PT14.690426S >>>>>>>>>>>>>> >>>>>>>>>>>>>> I need to double check what that would mean to correctness >>>>>>>>>>>>>> and all other aspects. >>>>>>>>>>>>>> >>>>>>>>>>>>>> G >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi < >>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Please report back on how the patch behaves including any >>>>>>>>>>>>>>> side effects. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Now I'm in testing the state reading with processor API vs >>>>>>>>>>>>>>> the mentioned job where we control the keys. >>>>>>>>>>>>>>> The difference is extreme, especially because the numbers >>>>>>>>>>>>>>> are coming from reading ~40Mb state file😅 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO o.a.f.e.s.r.FlinkTestStateReader >>>>>>>>>>>>>>> [] - Execution time: PT22M24.612954S >>>>>>>>>>>>>>> ... >>>>>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO >>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReaderJob [] - >>>>>>>>>>>>>>> Execution time: PT6.930659S >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Don't need to mention that the bigger is the processor API. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> G >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin < >>>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> That's a good idea, Sadly I have no control over the >>>>>>>>>>>>>>>> keys.... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I was going to patch Flink with the suggestion in >>>>>>>>>>>>>>>> FLINK-37109 >>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first >>>>>>>>>>>>>>>> to see how that goes. If that brings RockDb performance in an >>>>>>>>>>>>>>>> acceptable >>>>>>>>>>>>>>>> range for us we might go that way. I really like the light >>>>>>>>>>>>>>>> memory >>>>>>>>>>>>>>>> consumption of RockDb for that kind of side job. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi < >>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What I could imagine is to create a normal Flink job, >>>>>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint >>>>>>>>>>>>>>>>> set the operator UID on a custom written operator, which >>>>>>>>>>>>>>>>> opens the state info for you. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The only drawback is that you must know the keyBy range... >>>>>>>>>>>>>>>>> this can be problematic but if you can do it it's a win :) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin < >>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Gabor, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the >>>>>>>>>>>>>>>>>> savepoint in pages, instead of as a single blob up front >>>>>>>>>>>>>>>>>> which I think is >>>>>>>>>>>>>>>>>> what the hashmap does... we just want to be called for each >>>>>>>>>>>>>>>>>> entry and >>>>>>>>>>>>>>>>>> extract the bit we want in that scenario. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Never mind >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for >>>>>>>>>>>>>>>>>> nothing. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi < >>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Jean-Marc, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is not >>>>>>>>>>>>>>>>>>> reaching the performance criteria which it should be. There >>>>>>>>>>>>>>>>>>> is an open >>>>>>>>>>>>>>>>>>> issue for it [1]. >>>>>>>>>>>>>>>>>>> The hashmap based approach was and is always expecting >>>>>>>>>>>>>>>>>>> more memory. So if the memory footprint is a hard >>>>>>>>>>>>>>>>>>> requirement then RocksDB >>>>>>>>>>>>>>>>>>> is the only way now. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make >>>>>>>>>>>>>>>>>>> it better. All I can promise that I'm now measuring >>>>>>>>>>>>>>>>>>> performance of the >>>>>>>>>>>>>>>>>>> RocksDB approach >>>>>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't >>>>>>>>>>>>>>>>>>> know what exactly causes the slowness the new >>>>>>>>>>>>>>>>>>> Frocksdb-8.10.0 can be also >>>>>>>>>>>>>>>>>>> an imrpvement. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> All in all it will take some time to sort this out. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> BR, >>>>>>>>>>>>>>>>>>> G >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin < >>>>>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint and >>>>>>>>>>>>>>>>>>>> minimise the memory consumption. We just need to transform >>>>>>>>>>>>>>>>>>>> it into >>>>>>>>>>>>>>>>>>>> something else for investigation. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend, >>>>>>>>>>>>>>>>>>>> and is spread over 6 task slots in 6 pods (under k8s). >>>>>>>>>>>>>>>>>>>> Savepoints are saved >>>>>>>>>>>>>>>>>>>> on S3. A savepoint can be 4-5Gb or more. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution >>>>>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment(); >>>>>>>>>>>>>>>>>>>> env.setParallelism(1); >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> SavepointReader savepoint = >>>>>>>>>>>>>>>>>>>> SavepointReader.read(env, savepointLocation, >>>>>>>>>>>>>>>>>>>> new HashMapStateBackend()); >>>>>>>>>>>>>>>>>>>> // SavepointReader.read(env, savepointLocation, new >>>>>>>>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>>>>>>> mainOperatorState = >>>>>>>>>>>>>>>>>>>> savepoint.readKeyedState( >>>>>>>>>>>>>>>>>>>> MAIN_OPERATOR, >>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class)); >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> >>>>>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect() >>>>>>>>>>>>>>>>>>>> stateReader.forEachRemaining( record -> { ... >>>>>>>>>>>>>>>>>>>> /// extract what we need here >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> We tried two approaches: >>>>>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend. >>>>>>>>>>>>>>>>>>>> That works and is low on memory usage, but is very very >>>>>>>>>>>>>>>>>>>> slow. We noticed >>>>>>>>>>>>>>>>>>>> the iterator is available very early on, but it is slow... >>>>>>>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap >>>>>>>>>>>>>>>>>>>> backend. That is very fast, as expected. However the >>>>>>>>>>>>>>>>>>>> iterator apparently >>>>>>>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded in >>>>>>>>>>>>>>>>>>>> the HashMap, so >>>>>>>>>>>>>>>>>>>> heavy memory consumption. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it >>>>>>>>>>>>>>>>>>>> so that it does not consume all the memory ? or maybe >>>>>>>>>>>>>>>>>>>> reading it in parts... >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> JM >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>