What would be the best approach to read a savepoint and minimise the memory
consumption. We just need to transform it into something else for
investigation.
Our flink 1.20 streaming job is using HashMap backend, and is spread over 6
task slots in 6 pods (under k8s). Savepoints are saved on S3. A savepoint
can be 4-5Gb or more.
The reader is more basic, using a Local Execution EnvironmentThis is
essentially what we are doing:
StreamExecutionEnvironment env =
LocalStreamEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SavepointReader savepoint =
SavepointReader.read(env, savepointLocation, new
HashMapStateBackend());
// SavepointReader.read(env, savepointLocation, new
EmbeddedRocksDBStateBackend()); // Too slow
DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
mainOperatorState =
savepoint.readKeyedState(
MAIN_OPERATOR,
new StateManagerJsonNodeReaderFunction<>(StateManager.class));
CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
stateReader = mainOperatorState.executeAndCollect()
stateReader.forEachRemaining( record -> { ...
/// extract what we need here
}
We tried two approaches:
- One is to read the savepoint with a rockDb backend. That works and is low
on memory usage, but is very very slow. We noticed the iterator is
available very early on, but it is slow...
- The other is to read the savepoint with a HashMap backend. That is very
fast, as expected. However the iterator apparently only returns once the
whole savepoint has been loaded in the HashMap, so heavy memory consumption.
Is there a better way to do that? or a way to tune it so that it does not
consume all the memory ? or maybe reading it in parts...
Thanks
JM