Hi Jean-Marc,

Thanks for your efforts! We've done quite extensive tests inside and they
are all passing. Good to see that the number of keys matches on your side
too.
Regarding key ordering, the state processor API is not giving any
guarantees in terms of ordering.

All in all I'm confident with the solution so green light from my side. I'm
intended to open the PR for some time to have everybody the possibility
to make it better or raise any kind of concerns.

BR,
G


On Wed, Feb 12, 2025 at 1:21 PM Jean-Marc Paulin <jm.pau...@gmail.com>
wrote:

> Hi gabor
>
> I see all my 1,331,301 keys with your patch, which is exactly what I am
> expecting. If you have more specific concerns I suppose I can instrument my
> code further. I have no expectation regarding the key ordering.
>
> JM
>
>
> On Wed, Feb 12, 2025 at 11:29 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> I think the hashmap vs patched RocksDB makes sense, at least I'm
>> measuring similar number with relatively small states.
>> RockDb commenting out the remove() is a bit surprisingly high but since
>> it's causing correctness issues under some circumstances I would abandon
>> that.
>>
>> > I probably need more time to ensure we definitely see all the keys, but
>> that looks very good.
>>
>> Yeah, correctness is key here so waiting on your results.
>>
>> BR,
>> G
>>
>>
>> On Wed, Feb 12, 2025 at 11:56 AM Jean-Marc Paulin <j...@paulin.co.uk>
>> wrote:
>>
>>> Hi Gabor,
>>>
>>> I applied your 1.20 patch, and I got some very good numbers from it...
>>> so for my 5GB savepoint, I made sure I skip all my code overhead to get the
>>> raw number, and I can read it in
>>> - HashMap : 4 minutes
>>> - RockDb with your patch: ~19 minutes
>>> - RockDb commenting out the remove(): 49 minutes
>>>
>>> I am not sure these numbers make sense, and this is on a VM on my
>>> laptop, so not exactly a good performance testing environment, but the
>>> numbers I see are pretty good. I probably need more time to ensure we
>>> definitely see all the keys, but that looks very good.
>>>
>>> Hope this help
>>>
>>> JM
>>>
>>>
>>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: ( run 1/2)
>>> ```
>>> 2025-02-12 08:22:59,282 INFO  LifecycleStateExplorerTest  - Starting now
>>> 2025-02-12 08:22:59,283 INFO  c.i.a.l.s.LifecycleStateExplorer  -
>>> Reading state from /tmp/mock_savepoint16385495638880357651.
>>> 2025-02-12 08:41:29,066 INFO  LifecycleStateExplorerTest  - Completed now
>>> 2025-02-12 08:41:29,066 INFO  LifecycleStateExplorerTest  - Duration:
>>> PT18M29.783388324S ms
>>> ```
>>>
>>> #### using our flink 1.20 build, that removes the
>>> keysAndNamespaces.remove(); line
>>> ```
>>> 2025-02-12 09:03:39,018 INFO  LifecycleStateExplorerTest  - Starting now
>>> 2025-02-12 09:03:39,024 INFO  c.i.a.l.s.LifecycleStateExplorer  -
>>> Reading state from /tmp/mock_savepoint10990862094634162213.
>>> 2025-02-12 09:52:51,811 INFO  LifecycleStateExplorerTest  - Completed now
>>> 2025-02-12 09:52:51,813 INFO  LifecycleStateExplorerTest  - Duration:
>>> PT49M12.788979538S ms
>>> ```
>>>
>>> #### using 1.20-SNAPSHOT + FLINK-37109_1.20.patch.txt: (run 2/2)
>>> ```
>>> 2025-02-12 10:12:26,453 INFO  LifecycleStateExplorerTest  - Starting now
>>> 2025-02-12 10:12:26,458 INFO  c.i.a.l.s.LifecycleStateExplorer  -
>>> Reading state from /tmp/mock_savepoint7967784813864743408.
>>> 2025-02-12 10:32:20,215 INFO  LifecycleStateExplorerTest  - Completed now
>>> 2025-02-12 10:32:20,215 INFO  LifecycleStateExplorerTest  - Duration:
>>> PT19M53.757274969S ms
>>> ```
>>>
>>> #### using HashMapMemoryBackEnd (for reference)
>>> ```
>>> 2025-02-12 10:39:03,618 INFO  LifecycleStateExplorerTest  - Starting now
>>> 2025-02-12 10:39:03,622 INFO  c.i.a.l.s.LifecycleStateExplorer  -
>>> Reading state from /tmp/mock_savepoint14340081990006677909.
>>> 2025-02-12 10:43:16,454 INFO  LifecycleStateExplorerTest  - Completed now
>>> 2025-02-12 10:43:16,457 INFO  LifecycleStateExplorerTest  - Duration:
>>> PT4M12.832810524S ms
>>> ```
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Feb 11, 2025 at 3:39 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Yeah, I think this is more like the 1.x and 2.x incompatibility.
>>>> I've just opened the PR agains 1.20 which you can cherry-pick here [1].
>>>>
>>>> [1] https://github.com/apache/flink/pull/26145
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Tue, Feb 11, 2025 at 4:19 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>> wrote:
>>>>
>>>>> Hi Gabor,
>>>>>
>>>>> So, a bit of progress,
>>>>>
>>>>> I managed to compile our stuff against your 2.1-SNAPSHOT (with a bit
>>>>> of chopping around deprecated/changed and removed APIs - that wasn't too
>>>>> bad), but that failed to read the state I was using before (that was
>>>>> generated with a Flink 1.20). This is the stack trace I get. I suspect 
>>>>> this
>>>>> has more to do with state compatibility between 1.20 and 2.1... I was
>>>>> surprised the error is against a timer state. The end of the stack trace 
>>>>> is
>>>>> below.
>>>>>
>>>>> I will try to apply your patch/change/PR to our 1.20 build, but it's
>>>>> not a simple git apply 😭😭😭.
>>>>>
>>>>> $ git apply --check ~/Downloads/26134.patch.txt
>>>>> error: patch failed:
>>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:226
>>>>> error:
>>>>> flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
>>>>> patch does not apply
>>>>> error: patch failed:
>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:24
>>>>> error:
>>>>> flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java:
>>>>> patch does not apply
>>>>> error:
>>>>> flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java:
>>>>> No such file or directory
>>>>> error:
>>>>> flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/TestStateBackend.java:
>>>>> No such file or directory
>>>>> error: patch failed:
>>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:22
>>>>> error:
>>>>> flink-state-backends/flink-statebackend-common/src/test/java/org/apache/flink/state/common/PeriodicMaterializationManagerTest.java:
>>>>> patch does not apply
>>>>> error:
>>>>> flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java:
>>>>> No such file or directory
>>>>> error:
>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/RocksDBKeyedStateBackend.java:
>>>>> No such file or directory
>>>>> error:
>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/AbstractRocksStateKeysIterator.java:
>>>>> No such file or directory
>>>>> error:
>>>>> flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/iterator/RocksStateKeysIterator.java:
>>>>> No such file or directory
>>>>>
>>>>>
>>>>> stack trace if relevant...
>>>>>
>>>>> java.util.concurrent.CompletionException: java.lang.RuntimeException:
>>>>> Failed to fetch next result
>>>>>     at
>>>>> com.ibm.aiops.lifecycle.stateexplorer.LifecycleStateExplorer.lambda$0(LifecycleStateExplorer.java:88)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
>>>>> ~[?:?]
>>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result
>>>>>     ... many more
>>>>> Caused by: java.lang.RuntimeException: Failed to fetch next result
>>>>>     ... many more
>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>>> failed.
>>>>>     ... 7 more
>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>>> execution failed.
>>>>>     ... many more
>>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>>> suppressed by NoRestartBackoffTimeStrategy
>>>>>     ... many more
>>>>> Caused by: java.io.IOException: Failed to restore timer state
>>>>>     at
>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:193)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>>>     at
>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>> Method) ~[?:?]
>>>>>     at
>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>>> ~[?:?]
>>>>>     at
>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>>>     at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:138)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>> Method) ~[?:?]
>>>>>     at
>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>>> ~[?:?]
>>>>>     at
>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>> Caused by: org.apache.flink.util.StateMigrationException: The new
>>>>> state serializer
>>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@b45c430a)
>>>>> must not be incompatible with the old state serializer
>>>>> (org.apache.flink.api.common.typeutils.base.MapSerializer@edecbe44).
>>>>>     at
>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:858)
>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:763)
>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1020)
>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:1007)
>>>>> ~[flink-statebackend-rocksdb-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:384)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:435)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:150)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:135)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getMapState(SavepointRuntimeContext.java:232)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> com.ibm.aiops.flink.state.RuntimeContextStateProvider.getState(RuntimeContextStateProvider.java:28)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> com.ibm.aiops.flink.state.StateManager.getState(StateManager.java:49)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> com.ibm.aiops.lifecycle.policy.execution.state.PolicyExecutionState.<init>(PolicyExecutionState.java:88)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> com.ibm.aiops.lifecycle.policy.execution.state.RequestExecutionStateManager.<init>(RequestExecutionStateManager.java:52)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>>> Method) ~[?:?]
>>>>>     at
>>>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
>>>>> ~[?:?]
>>>>>     at
>>>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
>>>>> ~[?:?]
>>>>>     at
>>>>> com.ibm.aiops.flink.state.StateManagerJsonNodeReaderFunction.open(StateManagerJsonNodeReaderFunction.java:52)
>>>>> ~[classes/:?]
>>>>>     at
>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>>>>> ~[flink-core-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:190)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:66)
>>>>> ~[flink-state-processor-api-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.functions.source.legacy.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:107)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:68)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:337)
>>>>> ~[flink-runtime-2.1-SNAPSHOT.jar:2.1-SNAPSHOT]
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 11, 2025 at 12:16 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>> wrote:
>>>>>
>>>>>> Hi Gabor,
>>>>>>
>>>>>> Trying to but I struggle to compile my stuff against your Flink
>>>>>> build... tried to apply your PR as a patch on my 1.20 modified fork and
>>>>>> that didn't go well either. It will take time to untangle.
>>>>>>
>>>>>> Will keep you updated if I make progress
>>>>>>
>>>>>> JM
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 10, 2025 at 8:22 PM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Jean-Marc,
>>>>>>>
>>>>>>> FYI, I've just opened this [1] PR to address the issue in a clean
>>>>>>> way.
>>>>>>> May I ask you to test it on your side?
>>>>>>>
>>>>>>> [1] https://github.com/apache/flink/pull/26134
>>>>>>>
>>>>>>> BR,
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Feb 7, 2025 at 6:14 PM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Just a little update on this. We've made our first POC with the
>>>>>>>> redesigned approach and the numbers are promising :)
>>>>>>>> It still requires huge efforts in
>>>>>>>> development/correctness/performance perspective but seems like we have
>>>>>>>> something in the pocket.
>>>>>>>>
>>>>>>>> Test data: 256Mb state file with a single operator and 2 value
>>>>>>>> states
>>>>>>>> - Old execution time: 25M27.126737S
>>>>>>>> - New execution time: 1M19.602042S
>>>>>>>> In short: ~95% performance gain.
>>>>>>>>
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Feb 6, 2025 at 9:06 AM Gabor Somogyi <
>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> In short, when you don't care about
>>>>>>>>> multiple KeyedStateReaderFunction.readKey calls then you're on the 
>>>>>>>>> safe
>>>>>>>>> side.
>>>>>>>>>
>>>>>>>>> G
>>>>>>>>>
>>>>>>>>> On Wed, Feb 5, 2025 at 6:27 PM Jean-Marc Paulin <j...@paulin.co.uk>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I am still hoping that I am still good. I just read the savepoint
>>>>>>>>>> to extract information (parallelism 1, and only 1 task manager) . I 
>>>>>>>>>> also
>>>>>>>>>> know it has been created by a job using a HashMap backend. And I do 
>>>>>>>>>> not
>>>>>>>>>> care about duplicates.
>>>>>>>>>>
>>>>>>>>>> I should still be good, right? from what I saw I never read any
>>>>>>>>>> duplicate keys.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> JM
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 5, 2025 at 4:58 PM Gabor Somogyi <
>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Guys,
>>>>>>>>>>>
>>>>>>>>>>> We've just had an in-depth analysis and we think that removing
>>>>>>>>>>> that particular line causes correctness issues under some 
>>>>>>>>>>> circumstances.
>>>>>>>>>>>
>>>>>>>>>>> Namely key duplicates can happen when multiple column families
>>>>>>>>>>> are processed at the same time. Not need to mention that it would 
>>>>>>>>>>> cause
>>>>>>>>>>> multiple
>>>>>>>>>>> `readKey` calls which ends up in just wrong calculation logic
>>>>>>>>>>> (for example in simple sum calculation).
>>>>>>>>>>>
>>>>>>>>>>> We've a vision how this can be solved in a clean way but it will
>>>>>>>>>>> take some time.
>>>>>>>>>>>
>>>>>>>>>>> > Are there any plans on a migration guide or something for
>>>>>>>>>>> users to adapt their QS observers (beyond the current docs)?
>>>>>>>>>>>
>>>>>>>>>>> The gap between the two approaches are quite huge and
>>>>>>>>>>> considering the actual bugs and improvement possibilities in the 
>>>>>>>>>>> state
>>>>>>>>>>> processor API
>>>>>>>>>>> I would say this can come later on at least on my plate. When
>>>>>>>>>>> you see the gaps and you know how to fill them feel free to 
>>>>>>>>>>> contribute and
>>>>>>>>>>> we can shepherd the PRs.
>>>>>>>>>>>
>>>>>>>>>>> G
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 5, 2025 at 10:57 AM Salva Alcántara <
>>>>>>>>>>> salcantara...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks both for your work on this!
>>>>>>>>>>>>
>>>>>>>>>>>> On a related note, since Queryable State (QS) is going away
>>>>>>>>>>>> soon, streamlining the State Processor API as much as possible 
>>>>>>>>>>>> makes a lot
>>>>>>>>>>>> of sense.
>>>>>>>>>>>>
>>>>>>>>>>>> Are there any plans on a migration guide or something for users
>>>>>>>>>>>> to adapt their QS observers (beyond the current docs)?
>>>>>>>>>>>> (State-)Observability-wise Flink has some room for improvement I 
>>>>>>>>>>>> would say.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Salva
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:36 AM Gabor Somogyi <
>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your time investment and to share the numbers, it's
>>>>>>>>>>>>> super helpful.
>>>>>>>>>>>>> Ping me any time when you have further info to share.
>>>>>>>>>>>>>
>>>>>>>>>>>>> About the numbers: 48 minutes for 6Gb is not good but not
>>>>>>>>>>>>> terrible.
>>>>>>>>>>>>> I've seen petabyte scale states so I'm pretty sure we need to
>>>>>>>>>>>>> go beyond...
>>>>>>>>>>>>>
>>>>>>>>>>>>> Since we measure similar numbers with the unpatched Flink plus
>>>>>>>>>>>>> this has been reported this by several users,
>>>>>>>>>>>>> we must make changes in this area. It's still a question
>>>>>>>>>>>>> whether the tested patch is the right approach
>>>>>>>>>>>>> but at least we've touched the root cause.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The next step on my side is to have a deep dive and understand
>>>>>>>>>>>>> all the aspects why remove is there,
>>>>>>>>>>>>> how the remove elimination would effect existing use-cases and
>>>>>>>>>>>>> consider all other possibilities.
>>>>>>>>>>>>>
>>>>>>>>>>>>> BR,
>>>>>>>>>>>>> G
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Feb 5, 2025 at 9:05 AM Jean-Marc Paulin <
>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I finally got to run that change through. I have a 6Gb
>>>>>>>>>>>>>> savepoint I read and parse for reference.
>>>>>>>>>>>>>> - HashMap reads it in 14 minutes (but requires 10 Gb of RAM)
>>>>>>>>>>>>>> - RockDb with the patch reads it in 48 minutes (and requires
>>>>>>>>>>>>>> less than 2Gb)
>>>>>>>>>>>>>> - RockDb without the patch wasn't even halfway through after
>>>>>>>>>>>>>> 12 hours.... (I gave up)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I don't think I have any duplicates because the
>>>>>>>>>>>>>> application that generates the savepoint is using HashMap, so my 
>>>>>>>>>>>>>> scenario
>>>>>>>>>>>>>> may not be representative. I am using IBM Seremu Java 17 
>>>>>>>>>>>>>> (openJ9-0.46).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That was run on a VM on my  laptop, so not exactly a
>>>>>>>>>>>>>> controlled environment. but I think it's conclusive enough. I 
>>>>>>>>>>>>>> will need to
>>>>>>>>>>>>>> run further tests but I think we will patch our Flink. using a 
>>>>>>>>>>>>>> system
>>>>>>>>>>>>>> property to configure it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hope this help
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 4:01 PM Gabor Somogyi <
>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Just to give an update. I've applied the mentioned patch and
>>>>>>>>>>>>>>> the execution time drastically decreased (the gain is 98.9%):
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2025-02-04 16:52:54,448 INFO  o.a.f.e.s.r.FlinkTestStateReader  
>>>>>>>>>>>>>>>                  [] - Execution time: PT14.690426S
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I need to double check what that would mean to correctness
>>>>>>>>>>>>>>> and all other aspects.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 2:26 PM Gabor Somogyi <
>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Please report back on how the patch behaves including any
>>>>>>>>>>>>>>>> side effects.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Now I'm in testing the state reading with processor API vs
>>>>>>>>>>>>>>>> the mentioned job where we control the keys.
>>>>>>>>>>>>>>>> The difference is extreme, especially because the numbers
>>>>>>>>>>>>>>>> are coming from reading ~40Mb state file😅
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2025-02-04 13:21:53,580 INFO  o.a.f.e.s.r.FlinkTestStateReader 
>>>>>>>>>>>>>>>>                   [] - Execution time: PT22M24.612954S
>>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>> 2025-02-04 13:39:14,704 INFO  
>>>>>>>>>>>>>>>> o.a.f.e.s.r.FlinkTestStateReaderJob                [] - 
>>>>>>>>>>>>>>>> Execution time: PT6.930659S
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Don't need to mention that the bigger is the processor API.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 1:40 PM Jean-Marc Paulin <
>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> That's a good idea, Sadly I have no control over the
>>>>>>>>>>>>>>>>> keys....
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I was going to patch Flink with the suggestion in
>>>>>>>>>>>>>>>>> FLINK-37109
>>>>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-37109> first
>>>>>>>>>>>>>>>>> to see how that goes. If that brings RockDb performance in an 
>>>>>>>>>>>>>>>>> acceptable
>>>>>>>>>>>>>>>>> range for us we might go that way. I really like the light 
>>>>>>>>>>>>>>>>> memory
>>>>>>>>>>>>>>>>> consumption of RockDb for that kind of side job.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:23 PM Gabor Somogyi <
>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What I could imagine is to create a normal Flink job,
>>>>>>>>>>>>>>>>>> use execution.state-recovery.path=/path/to/savepoint
>>>>>>>>>>>>>>>>>> set the operator UID on a custom written operator, which
>>>>>>>>>>>>>>>>>> opens the state info for you.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The only drawback is that you must know the keyBy
>>>>>>>>>>>>>>>>>> range... this can be problematic but if you can do it it's a 
>>>>>>>>>>>>>>>>>> win :)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 12:16 PM Jean-Marc Paulin <
>>>>>>>>>>>>>>>>>> j...@paulin.co.uk> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Gabor,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I thought so. I was hoping for a way to read the
>>>>>>>>>>>>>>>>>>> savepoint in pages, instead of as a single blob up front 
>>>>>>>>>>>>>>>>>>> which I think is
>>>>>>>>>>>>>>>>>>> what the hashmap does... we just want to be called for each 
>>>>>>>>>>>>>>>>>>> entry and
>>>>>>>>>>>>>>>>>>> extract the bit we want in that scenario.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Never mind
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thank you for the insight. Saves me a lot of hunting for
>>>>>>>>>>>>>>>>>>> nothing.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:45 AM Gabor Somogyi <
>>>>>>>>>>>>>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Jean-Marc,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> We've already realized that the RocksDB approach is not
>>>>>>>>>>>>>>>>>>>> reaching the performance criteria which it should be. 
>>>>>>>>>>>>>>>>>>>> There is an open
>>>>>>>>>>>>>>>>>>>> issue for it [1].
>>>>>>>>>>>>>>>>>>>> The hashmap based approach was and is always expecting
>>>>>>>>>>>>>>>>>>>> more memory. So if the memory footprint is a hard 
>>>>>>>>>>>>>>>>>>>> requirement then RocksDB
>>>>>>>>>>>>>>>>>>>> is the only way now.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Bad to say but I can't suggest any nifty trick to make
>>>>>>>>>>>>>>>>>>>> it better. All I can promise that I'm now measuring 
>>>>>>>>>>>>>>>>>>>> performance of the
>>>>>>>>>>>>>>>>>>>> RocksDB approach
>>>>>>>>>>>>>>>>>>>> and intended to eliminate the slowness. Since we don't
>>>>>>>>>>>>>>>>>>>> know what exactly causes the slowness the new 
>>>>>>>>>>>>>>>>>>>> Frocksdb-8.10.0 can be also
>>>>>>>>>>>>>>>>>>>> an imrpvement.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> All in all it will take some time to sort this out.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-37109
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> BR,
>>>>>>>>>>>>>>>>>>>> G
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Feb 4, 2025 at 10:29 AM Jean-Marc Paulin <
>>>>>>>>>>>>>>>>>>>> jm.pau...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> What would be the best approach to read a savepoint
>>>>>>>>>>>>>>>>>>>>> and minimise the memory consumption. We just need to 
>>>>>>>>>>>>>>>>>>>>> transform it into
>>>>>>>>>>>>>>>>>>>>> something else for investigation.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Our flink 1.20 streaming job is using HashMap backend,
>>>>>>>>>>>>>>>>>>>>> and is spread over 6 task slots in 6 pods (under k8s). 
>>>>>>>>>>>>>>>>>>>>> Savepoints are saved
>>>>>>>>>>>>>>>>>>>>> on S3. A savepoint can be 4-5Gb or more.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The reader is more basic, using a Local Execution
>>>>>>>>>>>>>>>>>>>>> EnvironmentThis is essentially what we are doing:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>     StreamExecutionEnvironment env =
>>>>>>>>>>>>>>>>>>>>> LocalStreamEnvironment.getExecutionEnvironment();
>>>>>>>>>>>>>>>>>>>>>     env.setParallelism(1);
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>     SavepointReader savepoint =
>>>>>>>>>>>>>>>>>>>>>         SavepointReader.read(env, savepointLocation,
>>>>>>>>>>>>>>>>>>>>> new HashMapStateBackend());
>>>>>>>>>>>>>>>>>>>>>     // SavepointReader.read(env, savepointLocation,
>>>>>>>>>>>>>>>>>>>>> new EmbeddedRocksDBStateBackend()); // Too slow
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>>>>>>> mainOperatorState =
>>>>>>>>>>>>>>>>>>>>>         savepoint.readKeyedState(
>>>>>>>>>>>>>>>>>>>>>             MAIN_OPERATOR,
>>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>>> StateManagerJsonNodeReaderFunction<>(StateManager.class));
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
>>>>>>>>>>>>>>>>>>>>> stateReader = mainOperatorState.executeAndCollect()
>>>>>>>>>>>>>>>>>>>>>     stateReader.forEachRemaining( record -> { ...
>>>>>>>>>>>>>>>>>>>>>         /// extract what we need here
>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> We tried two approaches:
>>>>>>>>>>>>>>>>>>>>> - One is to read the savepoint with a rockDb backend.
>>>>>>>>>>>>>>>>>>>>> That works and is low on memory usage, but is very very 
>>>>>>>>>>>>>>>>>>>>> slow. We noticed
>>>>>>>>>>>>>>>>>>>>> the iterator is available very early on, but it is slow...
>>>>>>>>>>>>>>>>>>>>> - The other is to read the savepoint with a HashMap
>>>>>>>>>>>>>>>>>>>>> backend. That is very fast, as expected. However the 
>>>>>>>>>>>>>>>>>>>>> iterator apparently
>>>>>>>>>>>>>>>>>>>>> only returns once the whole savepoint has been loaded in 
>>>>>>>>>>>>>>>>>>>>> the HashMap, so
>>>>>>>>>>>>>>>>>>>>> heavy memory consumption.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Is there a better way to do that? or a way to tune it
>>>>>>>>>>>>>>>>>>>>> so that it does not consume all the memory ? or maybe 
>>>>>>>>>>>>>>>>>>>>> reading it in parts...
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> JM
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>

Reply via email to