[ https://issues.apache.org/jira/browse/FLINK-25400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784691#comment-17784691 ]
Jinzhong Li commented on FLINK-25400: ------------------------------------- I think this issue still exits in 1.19. The task configuration of SavepointEnvironment should be passed to SavepointTaskManagerRuntimeInfo, so that SavepointEnvironment.getTaskManagerInfo().getConfiguration() can obtain the task configuration. If my analysis is correct, I want to fix this bug. > RocksDBStateBackend configurations does not work with SavepointEnvironment > -------------------------------------------------------------------------- > > Key: FLINK-25400 > URL: https://issues.apache.org/jira/browse/FLINK-25400 > Project: Flink > Issue Type: Bug > Components: API / State Processor > Affects Versions: 1.12.2 > Reporter: wuzhiyu > Priority: Major > > Hi~ > I'm trying to use flink-state-processor-api to do state migrations by reading > states from an existing savepoint, and writing them into a new savepoint > after certain transformations. > However, the reading rate does not meet my expectation. > When I tried to tune RocksDB by enabling RocksDB native metrics, I found it > did not work. > So I did some debug, I found when the job is running under a > SavepointEnvironment, no RocksDBStatebackend configurations will be passed to > RocksDBStateBackend. > The whole process is described as below (code demonstrated is under version > release-1.12.2): > First, when > org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is > invoked: > > {code:java} > // org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend > private StateBackend createStateBackend() throws Exception { > final StateBackend fromApplication = > configuration.getStateBackend(getUserCodeClassLoader()); > return StateBackendLoader.fromApplicationOrConfigOrDefault( > fromApplication, > getEnvironment().getTaskManagerInfo().getConfiguration(), > getUserCodeClassLoader(), > LOG); {code} > *getEnvironment()* returns a SavepointEnvironment instance. > > And then > *org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo* > is invoked, it returns a new > *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo* instance. > > {code:java} > // org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo > @Override > public TaskManagerRuntimeInfo getTaskManagerInfo() { > return new SavepointTaskManagerRuntimeInfo(getIOManager()); > } {code} > > At last, > *org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration* > is invoked. It returns an empty configuration, which means all > configurations will be lost. > {code:java} > // > org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration > @Override > public Configuration getConfiguration() { > return new Configuration(); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)