Hi gabor

I see all my 1,331,301 keys with your patch, which is exactly what I am
expecting. If you have more specific concerns I suppose I can instrument my
code further. I have no expectation regarding the key ordering.

JM


On Wed, Feb 12, 2025 at 11:29 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> I think the hashmap vs patched RocksDB makes sense, at least I'm measuring
> similar number with relatively small states.
> RockDb commenting out the remove() is a bit surprisingly high but since
> it's causing correctness issues under some circumstances I would abandon
> that.
>
> > I probably need more time to ensure we definitely see all the keys, but
> that looks very good.
>
> Yeah, correctness is key here so waiting on your results.
>
> BR,
> G
>
>
> On Wed, Feb 12, 2025 at 11:56 AM Jean-Marc Paulin <j...@paulin.co.uk> wrote:
>
>> Hi Gabor,
>>
>> I applied your 1.20 patch, and I got some very good numbers from it... so
>> for my 5GB savepoint, I made sure I skip all my code overhead to get the
>> raw number, and I can read it in
>> - HashMap : 4 minutes
>> - RockDb with your patch: ~19 minutes
>> - RockDb commenting out the remove(): 49 minutes
>>
>> I am not sure these numbers make sense, and this is on a VM on my laptop,
>> so not exactly a good performance testing environment, but the numbers I
>> see are pretty good. I probably need more time to ensure we definitely see
>> all the keys, but that looks very good.
>>
>> Hope this help
>>
>> JM
>>
>>
>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: ( run 1/2)
>> ```
>> 2025-02-12 08:22:59,282 INFO  LifecycleStateExplorerTest  - Starting now
>> 2025-02-12 08:22:59,283 INFO  c.i.a.l.s.LifecycleStateExplorer  - Reading
>> state from /tmp/mock_savepoint16385495638880357651.
>> 2025-02-12 08:41:29,066 INFO  LifecycleStateExplorerTest  - Completed now
>> 2025-02-12 08:41:29,066 INFO  LifecycleStateExplorerTest  - Duration:
>> PT18M29.783388324S ms
>> ```
>>
>> #### using our flink 1.20 build, that removes the
>> keysAndNamespaces.remove(); line
>> ```
>> 2025-02-12 09:03:39,018 INFO  LifecycleStateExplorerTest  - Starting now
>> 2025-02-12 09:03:39,024 INFO  c.i.a.l.s.LifecycleStateExplorer  - Reading
>> state from /tmp/mock_savepoint10990862094634162213.
>> 2025-02-12 09:52:51,811 INFO  LifecycleStateExplorerTest  - Completed now
>> 2025-02-12 09:52:51,813 INFO  LifecycleStateExplorerTest  - Duration:
>> PT49M12.788979538S ms
>> ```
>>
>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: (run 2/2)
>> ```
>> 2025-02-12 10:12:26,453 INFO  LifecycleStateExplorerTest  - Starting now
>> 2025-02-12 10:12:26,458 INFO  c.i.a.l.s.LifecycleStateExplorer  - Reading
>> state from /tmp/mock_savepoint7967784813864743408.
>> 2025-02-12 10:32:20,215 INFO  LifecycleStateExplorerTest  - Completed now
>> 2025-02-12 10:32:20,215 INFO  LifecycleStateExplorerTest  - Duration:
>> PT19M53.757274969S ms
>> ```
>>
>> #### using HashMapMemoryBackEnd (for reference)
>> ```
>> 2025-02-12 10:39:03,618 INFO  LifecycleStateExplorerTest  - Starting now
>> 2025-02-12 10:39:03,622 INFO  c.i.a.l.s.LifecycleStateExplorer  - Reading
>> state from /tmp/mock_savepoint14340081990006677909.
>> 2025-02-12 10:43:16,454 INFO  LifecycleStateExplorerTest  - Completed now
>> 2025-02-12 10:43:16,457 INFO  LifecycleStateExplorerTest  - Duration:
>> PT4M12.832810524S ms
>> ```
>>
>>
>>
>>
>>
>> On Tue, Feb 11, 2025 at 3:39 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Yeah, I think this is more like the 1.x and 2.x incompatibility.
>>> I've just opened the PR agains 1.20 which you can cherry-pick here [1].
>>>
>>> [1] https://github.com/apache/flink/pull/26145
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Tue, Feb 11, 2025 at 4:19 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>> wrote:
>>>
>>>> Hi Gabor,
>>>>
>>>> So, a bit of progress,
>>>>
>>>> I managed to compile our stuff against your 2.1-SNAPSHOT (with a bit of
>>>> chopping around deprecated/changed and removed APIs - that wasn't too bad),
>>>> but that failed to read the state I was using before (that was generated
>>>> with a Flink 1.20). This is the stack trace I get. I suspect this has more
>>>> to do with state compatibility between 1.20 and 2.1... I was surprised the
>>>> error is against a timer state. The end of the stack trace is below.
>>>>
>>>> I will try to apply your patch/change/PR to our 1.20 build, but it's
>>>> not a simple git apply 😭😭😭.
>>>>
>>>> $ git apply --check ~/Downloads/26134.patch.txt
>>>> error: patch failed:
>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226
>>>> error:
>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
>>>> patch does not apply
>>>> error: patch failed:
>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24
>>>> error:
>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:
>>>> patch does not apply
>>>> error:
>>>> flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java:
>>>> No such file or directory
>>>> error:
>>>> flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java:
>>>> No such file or directory
>>>> error: patch failed:
>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22
>>>> error:
>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:
>>>> patch does not apply
>>>> error:
>>>> flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java:
>>>> No such file or directory
>>>> error:
>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java:
>>>> No such file or directory
>>>> error:
>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java:
>>>> No such file or directory
>>>> error:
>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java:
>>>> No such file or directory
>>>>
>>>>
>>>> stack trace if relevant...
>>>>
>>>> java.util.concurrent.CompletionException: java.lang.RuntimeException:
>>>> Failed to fetch next result
>>>>     at
>>>> com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88)
>>>> ~[classes/:?]
>>>>     at
>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
>>>> ~[?:?]
>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result
>>>>     ... many more
>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result
>>>>     ... many more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>> failed.
>>>>     ... 7 more
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>>     ... many more
>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>> suppressed by NoRestartBackoffTimeStrategy
>>>>     ... many more
>>>> Caused by: java.io.IOException: Failed to restore timer state
>>>>     at
>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>     at
>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>> Method) ~[?:?]
>>>>     at
>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>> ~[?:?]
>>>>     at
>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>> ~[?:?]
>>>>     at
>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>> ~[classes/:?]
>>>>     at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>>     at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
>>>> ~[classes/:?]
>>>>     at
>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
>>>> ~[classes/:?]
>>>>     at
>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
>>>> ~[classes/:?]
>>>>     at
>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
>>>> ~[classes/:?]
>>>>     at
>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>> Method) ~[?:?]
>>>>     at
>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>> ~[?:?]
>>>>     at
>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>> ~[?:?]
>>>>     at
>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>> ~[classes/:?]
>>>>     at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>>> serializer
>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a)
>>>> must not be incompatible with the old state serializer
>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44).
>>>>     at
>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858)
>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763)
>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020)
>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007)
>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
>>>> ~[classes/:?]
>>>>     at
>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
>>>> ~[classes/:?]
>>>>     at
>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
>>>> ~[classes/:?]
>>>>     at
>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
>>>> ~[classes/:?]
>>>>     at
>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>> Method) ~[?:?]
>>>>     at
>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>> ~[?:?]
>>>>     at
>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>> ~[?:?]
>>>>     at
>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>> ~[?:?]
>>>>     at
>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>> ~[classes/:?]
>>>>     at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>     at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>
>>>>
>>>>
>>>> On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>> wrote:
>>>>
>>>>> Hi Gabor,
>>>>>
>>>>> Trying to but I struggle to compile my stuff against your Flink
>>>>> build... tried to apply your PR as a patch on my 1.20 modified fork and
>>>>> that didn't go well either. It will take time to untangle.
>>>>>
>>>>> Will keep you updated if I make progress
>>>>>
>>>>> JM
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> Hi Jean-Marc,
>>>>>>
>>>>>> FYI, I've just opened this [1] PR to address the issue in a clean way.
>>>>>> May I ask you to test it on your side?
>>>>>>
>>>>>> [1] https://github.com/apache/flink/pull/26134
>>>>>>
>>>>>> BR,
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Just a little update on this. We've made our first POC with the
>>>>>>> redesigned approach and the numbers are promising :)
>>>>>>> It still requires huge efforts in
>>>>>>> development/correctness/performance perspective but seems like we have
>>>>>>> something in the pocket.
>>>>>>>
>>>>>>> Test data: 256Mb state file with a single operator and 2 value states
>>>>>>> - Old execution time: 25M27.126737S
>>>>>>> - New execution time: 1M19.602042S
>>>>>>> In short: ~95% performance gain.
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> In short, when you don't care about
>>>>>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the safe
>>>>>>>> side.
>>>>>>>>
>>>>>>>> G
>>>>>>>>
>>>>>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I am still hoping that I am still good. I just read the savepoint
>>>>>>>>> to extract information (parallelism 1, and only 1 task manager) . I 
>>>>>>>>> also
>>>>>>>>> know it has been created by a job using a HashMap backend. And I do 
>>>>>>>>> not
>>>>>>>>> care about duplicates.
>>>>>>>>>
>>>>>>>>> I should still be good, right? from what I saw I never read any
>>>>>>>>> duplicate keys.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> JM
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <
>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Guys,
>>>>>>>>>>
>>>>>>>>>> We've just had an in-depth analysis and we think that removing
>>>>>>>>>> that particular line causes correctness issues under some 
>>>>>>>>>> circumstances.
>>>>>>>>>>
>>>>>>>>>> Namely key duplicates can happen when multiple column families
>>>>>>>>>> are processed at the same time. Not need to mention that it would 
>>>>>>>>>> cause
>>>>>>>>>> multiple
>>>>>>>>>> `readKey` calls which ends up in just wrong calculation logic
>>>>>>>>>> (for example in simple sum calculation).
>>>>>>>>>>
>>>>>>>>>> We've a vision how this can be solved in a clean way but it will
>>>>>>>>>> take some time.
>>>>>>>>>>
>>>>>>>>>> > Are there any plans on a migration guide or something for users
>>>>>>>>>> to adapt their QS observers (beyond the current docs)?
>>>>>>>>>>
>>>>>>>>>> The gap between the two approaches are quite huge and considering
>>>>>>>>>> the actual bugs and improvement possibilities in the state processor 
>>>>>>>>>> API
>>>>>>>>>> I would say this can come later on at least on my plate. When you
>>>>>>>>>> see the gaps and you know how to fill them feel free to contribute 
>>>>>>>>>> and
>>>>>>>>>> we can shepherd the PRs.
>>>>>>>>>>
>>>>>>>>>> G
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <
>>>>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks both for your work on this!
>>>>>>>>>>>
>>>>>>>>>>> On a related note, since Queryable State (QS) is going away
>>>>>>>>>>> soon, streamlining the State Processor API as much as possible 
>>>>>>>>>>> makes a lot
>>>>>>>>>>> of sense.
>>>>>>>>>>>
>>>>>>>>>>> Are there any plans on a migration guide or something for users
>>>>>>>>>>> to adapt their QS observers (beyond the current docs)?
>>>>>>>>>>> (State-)Observability-wise Flink has some room for improvement I 
>>>>>>>>>>> would say.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>>
>>>>>>>>>>> Salva
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <
>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your time investment and to share the numbers, it's
>>>>>>>>>>>> super helpful.
>>>>>>>>>>>> Ping me any time when you have further info to share.
>>>>>>>>>>>>
>>>>>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not
>>>>>>>>>>>> terrible.
>>>>>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need to
>>>>>>>>>>>> go beyond...
>>>>>>>>>>>>
>>>>>>>>>>>> Since we measure similar numbers with the unpatched Flink plus
>>>>>>>>>>>> this has been reported this by several users,
>>>>>>>>>>>> we must make changes in this area. It's still a question
>>>>>>>>>>>> whether the tested patch is the right approach
>>>>>>>>>>>> but at least we've touched the root cause.
>>>>>>>>>>>>
>>>>>>>>>>>> The next step on my side is to have a deep dive and understand
>>>>>>>>>>>> all the aspects why remove is there,
>>>>>>>>>>>> how the remove elimination would effect existing use-cases and
>>>>>>>>>>>> consider all other possibilities.
>>>>>>>>>>>>
>>>>>>>>>>>> BR,
>>>>>>>>>>>> G
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <
>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I finally got to run that change through. I have a 6Gb
>>>>>>>>>>>>> savepoint I read and parse for reference.
>>>>>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>>>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires
>>>>>>>>>>>>> less than 2Gb)
>>>>>>>>>>>>> - RockDb without the patch wasn't even halfway through after
>>>>>>>>>>>>> 12 hours.... (I gave up)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I don't think I have any duplicates because the
>>>>>>>>>>>>> application that generates the savepoint is using HashMap, so my 
>>>>>>>>>>>>> scenario
>>>>>>>>>>>>> may not be representative. I am using IBM Seremu Java 17 
>>>>>>>>>>>>> (openJ9-0.46).
>>>>>>>>>>>>>
>>>>>>>>>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>>>>>>>>>> controlled environment. but I think it's conclusive enough. I 
>>>>>>>>>>>>> will need to
>>>>>>>>>>>>> run further tests but I think we will patch our Flink. using a 
>>>>>>>>>>>>> system
>>>>>>>>>>>>> property to configure it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hope this help
>>>>>>>>>>>>>
>>>>>>>>>>>>> JM
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just to give an update. I've applied the mentioned patch and
>>>>>>>>>>>>>> the execution time drastically decreased (the gain is 98.9%):
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader   
>>>>>>>>>>>>>>                 [] - Execution time: PT14.690426S
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I need to double check what that would mean to correctness
>>>>>>>>>>>>>> and all other aspects.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Please report back on how the patch behaves including any
>>>>>>>>>>>>>>> side effects.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Now I'm in testing the state reading with processor API vs
>>>>>>>>>>>>>>> the mentioned job where we control the keys.
>>>>>>>>>>>>>>> The difference is extreme, especially because the numbers
>>>>>>>>>>>>>>> are coming from reading ~40Mb state file😅
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader  
>>>>>>>>>>>>>>>                  [] - Execution time: PT22M24.612954S
>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO  
>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReaderJob                [] - 
>>>>>>>>>>>>>>> Execution time: PT6.930659S
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <
>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That's a good idea, Sadly I have no control over the
>>>>>>>>>>>>>>>> keys....
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I was going to patch Flink with the suggestion in
>>>>>>>>>>>>>>>> FLINK-37109
>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first
>>>>>>>>>>>>>>>> to see how that goes. If that brings RockDb performance in an 
>>>>>>>>>>>>>>>> acceptable
>>>>>>>>>>>>>>>> range for us we might go that way. I really like the light 
>>>>>>>>>>>>>>>> memory
>>>>>>>>>>>>>>>> consumption of RockDb for that kind of side job.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>>>>>>>>>> set the operator UID on a custom written operator, which
>>>>>>>>>>>>>>>>> opens the state info for you.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The only drawback is that you must know the keyBy range...
>>>>>>>>>>>>>>>>> this can be problematic but if you can do it it's a win :)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <
>>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the
>>>>>>>>>>>>>>>>>> savepoint in pages, instead of as a single blob up front 
>>>>>>>>>>>>>>>>>> which I think is
>>>>>>>>>>>>>>>>>> what the hashmap does... we just want to be called for each 
>>>>>>>>>>>>>>>>>> entry and
>>>>>>>>>>>>>>>>>> extract the bit we want in that scenario.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Never mind
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for
>>>>>>>>>>>>>>>>>> nothing.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is not
>>>>>>>>>>>>>>>>>>> reaching the performance criteria which it should be. There 
>>>>>>>>>>>>>>>>>>> is an open
>>>>>>>>>>>>>>>>>>> issue for it [1].
>>>>>>>>>>>>>>>>>>> The hashmap based approach was and is always expecting
>>>>>>>>>>>>>>>>>>> more memory. So if the memory footprint is a hard 
>>>>>>>>>>>>>>>>>>> requirement then RocksDB
>>>>>>>>>>>>>>>>>>> is the only way now.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make
>>>>>>>>>>>>>>>>>>> it better. All I can promise that I'm now measuring 
>>>>>>>>>>>>>>>>>>> performance of the
>>>>>>>>>>>>>>>>>>> RocksDB approach
>>>>>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't
>>>>>>>>>>>>>>>>>>> know what exactly causes the slowness the new 
>>>>>>>>>>>>>>>>>>> Frocksdb-8.10.0 can be also
>>>>>>>>>>>>>>>>>>> an imrpvement.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> BR,
>>>>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint and
>>>>>>>>>>>>>>>>>>>> minimise the memory consumption. We just need to transform 
>>>>>>>>>>>>>>>>>>>> it into
>>>>>>>>>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend,
>>>>>>>>>>>>>>>>>>>> and is spread over 6 task slots in 6 pods (under k8s). 
>>>>>>>>>>>>>>>>>>>> Savepoints are saved
>>>>>>>>>>>>>>>>>>>> on S3. A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>>>>>>>>>         SavepointReader.read(env, savepointLocation,
>>>>>>>>>>>>>>>>>>>> new HashMapStateBackend());
>>>>>>>>>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation, new
>>>>>>>>>>>>>>>>>>>> EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend.
>>>>>>>>>>>>>>>>>>>> That works and is low on memory usage, but is very very 
>>>>>>>>>>>>>>>>>>>> slow. We noticed
>>>>>>>>>>>>>>>>>>>> the iterator is available very early on, but it is slow...
>>>>>>>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap
>>>>>>>>>>>>>>>>>>>> backend. That is very fast, as expected. However the 
>>>>>>>>>>>>>>>>>>>> iterator apparently
>>>>>>>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded in 
>>>>>>>>>>>>>>>>>>>> the HashMap, so
>>>>>>>>>>>>>>>>>>>> heavy memory consumption.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it
>>>>>>>>>>>>>>>>>>>> so that it does not consume all the memory ? or maybe 
>>>>>>>>>>>>>>>>>>>> reading it in parts...
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>

Reply via email to