What would be the best approach to read a savepoint and minimise the memory
consumption. We just need to transform it into something else for
investigation.

Our flink 1.20 streaming job is using HashMap backend, and is spread over 6
task slots in 6 pods (under k8s). Savepoints are saved on S3. A savepoint
can be 4-5Gb or more.

The reader is more basic, using a Local Execution EnvironmentThis is
essentially what we are doing:

    StreamExecutionEnvironment env =
LocalStreamEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    SavepointReader savepoint =
        SavepointReader.read(env, savepointLocation, new
HashMapStateBackend());
    // SavepointReader.read(env, savepointLocation, new
EmbeddedRocksDBStateBackend()); // Too slow

    DataStream<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
mainOperatorState =
        savepoint.readKeyedState(
            MAIN_OPERATOR,
            new StateManagerJsonNodeReaderFunction<>(StateManager.class));


CloseableIterator<MainOperatorStateJsonNodeReaderFunction.KeyedStateItem>
stateReader = mainOperatorState.executeAndCollect()
    stateReader.forEachRemaining( record -> { ...
        /// extract what we need here
    }


We tried two approaches:
- One is to read the savepoint with a rockDb backend. That works and is low
on memory usage, but is very very slow. We noticed the iterator is
available very early on, but it is slow...
- The other is to read the savepoint with a HashMap backend. That is very
fast, as expected. However the iterator apparently only returns once the
whole savepoint has been loaded in the HashMap, so heavy memory consumption.

Is there a better way to do that? or a way to tune it so that it does not
consume all the memory ? or maybe reading it in parts...

Thanks

JM

Reply via email to