What would be the best approach to read a savepoint and minimise the memory consumption. We just need to transform it into something else for investigation.
Our flink 1.20 streaming job is using HashMap backend, and is spread over 6 task slots in 6 pods (under k8s). Savepoints are saved on S3. A savepoint can be 4-5Gb or more. The reader is more basic, using a Local Execution EnvironmentThis is essentially what we are doing: StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment(); env.setParallelism(1); SavepointReader savepoint = SavepointReader.read(env, savepointLocation, new HashMapStateBackend()); // SavepointReader.read(env, savepointLocation, new EmbeddedRocksDBStateBackend()); // Too slow DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> mainOperatorState = savepoint.readKeyedState( MAIN_OPERATOR, new StateManagerJsonNodeReaderFunction<>(StateManager.class)); CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem> stateReader = mainOperatorState.executeAndCollect() stateReader.forEachRemaining( record -> { ... /// extract what we need here } We tried two approaches: - One is to read the savepoint with a rockDb backend. That works and is low on memory usage, but is very very slow. We noticed the iterator is available very early on, but it is slow... - The other is to read the savepoint with a HashMap backend. That is very fast, as expected. However the iterator apparently only returns once the whole savepoint has been loaded in the HashMap, so heavy memory consumption. Is there a better way to do that? or a way to tune it so that it does not consume all the memory ? or maybe reading it in parts... Thanks JM