Zakelly commented on a change in pull request #16341: URL: https://github.com/apache/flink/pull/16341#discussion_r663774309
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.changelog.StateChangelogStorage; +import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader; +import org.apache.flink.util.ShutdownHookUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; + +import java.util.HashMap; +import java.util.Map; + +/** This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). */ +public class TaskExecutorStateChangelogStoragesManager { + + /** Logger for this class. */ + private static final Logger LOG = + LoggerFactory.getLogger(TaskExecutorStateChangelogStoragesManager.class); + + /** + * This map holds all state changelog storages for tasks running on the task manager / executor + * that own the instance of this. Maps from job id to all the subtask's state changelog + * storages. + */ + @GuardedBy("lock") + private final Map<JobID, StateChangelogStorage<?>> changelogStoragesByJobId; + + /** Guarding lock for changelogStoragesByJobId and closed-flag. */ + private final Object lock; + + @GuardedBy("lock") + private boolean closed; + + /** shutdown hook for this manager. */ + private final Thread shutdownHook; + + public TaskExecutorStateChangelogStoragesManager() { + this.changelogStoragesByJobId = new HashMap<>(); + this.lock = new Object(); + this.closed = false; + + // register a shutdown hook + this.shutdownHook = + ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG); Review comment: The ```TaskExecutorStateChangelogStoragesManager``` behaves like ```TaskExecutorLocalStateStoresManager```. It depends on the ```releaseStateChangelogStorageForJob``` to clean up, which will be triggered in ```TaskExecutor::closeJobManagerConnectionIfNoAllocatedResources``` when the last slot of a job is released. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.changelog.StateChangelogStorage; +import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader; +import org.apache.flink.util.ShutdownHookUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; + +import java.util.HashMap; +import java.util.Map; + +/** This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). */ +public class TaskExecutorStateChangelogStoragesManager { + + /** Logger for this class. */ + private static final Logger LOG = + LoggerFactory.getLogger(TaskExecutorStateChangelogStoragesManager.class); + + /** + * This map holds all state changelog storages for tasks running on the task manager / executor + * that own the instance of this. Maps from job id to all the subtask's state changelog + * storages. + */ + @GuardedBy("lock") + private final Map<JobID, StateChangelogStorage<?>> changelogStoragesByJobId; + + /** Guarding lock for changelogStoragesByJobId and closed-flag. */ + private final Object lock; + + @GuardedBy("lock") + private boolean closed; + + /** shutdown hook for this manager. */ + private final Thread shutdownHook; + + public TaskExecutorStateChangelogStoragesManager() { + this.changelogStoragesByJobId = new HashMap<>(); + this.lock = new Object(); + this.closed = false; + + // register a shutdown hook + this.shutdownHook = + ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG); + } + + public StateChangelogStorage<?> stateChangelogStorageForJob( + @Nonnull JobID jobId, Configuration configuration) { + synchronized (lock) { + if (closed) { Review comment: Good point. This is mainly about the ```closed``` and ```changelogStoragesByJobId``` synchronizing among ```stateChangelogStorageForJob```, ```releaseStateChangelogStorageForJob``` and ```shutdown```. I'm not very familiar with the thread model here, but it seems this manager will be accessed same time with the job table. So I think the lock can be removed. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java ########## @@ -18,26 +18,81 @@ package org.apache.flink.runtime.state.changelog; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.plugin.PluginManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; import java.util.Iterator; import java.util.ServiceLoader; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterators.concat; /** A thin wrapper around {@link PluginManager} to load {@link StateChangelogStorage}. */ @Internal public class StateChangelogStorageLoader { - private final PluginManager pluginManager; - public StateChangelogStorageLoader(PluginManager pluginManager) { - this.pluginManager = pluginManager; + private static final Logger LOG = LoggerFactory.getLogger(StateChangelogStorageLoader.class); + + /** Object used to protect calls to specific methods. */ + private static final ReentrantLock LOCK = new ReentrantLock(true); + + /** + * Mapping of state changelog storage identifier to the corresponding storage factories, + * populated in {@link StateChangelogStorageLoader#initialize(PluginManager)}. + */ + private static final HashMap<String, StateChangelogStorageFactory> + STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>(); + + static { + // Guarantee to trigger once. + initialize(null); + } + + public static void initialize(PluginManager pluginManager) { + LOCK.lock(); + try { + STATE_CHANGELOG_STORAGE_FACTORIES.clear(); + Iterator<StateChangelogStorageFactory> iterator = + pluginManager == null + ? ServiceLoader.load(StateChangelogStorageFactory.class).iterator() + : concat( + pluginManager.load(StateChangelogStorageFactory.class), + ServiceLoader.load(StateChangelogStorageFactory.class) + .iterator()); + iterator.forEachRemaining( + factory -> + STATE_CHANGELOG_STORAGE_FACTORIES.putIfAbsent( + factory.getIdentifier(), factory)); + LOG.info( + "StateChangelogStorageLoader initialized with shortcut names {{}}.", + String.join(",", STATE_CHANGELOG_STORAGE_FACTORIES.keySet())); + } finally { + LOCK.unlock(); + } } @SuppressWarnings({"rawtypes"}) - public Iterator<StateChangelogStorage> load() { - return concat( - pluginManager.load(StateChangelogStorage.class), - ServiceLoader.load(StateChangelogStorage.class).iterator()); + public static StateChangelogStorage load(Configuration configuration) { + final String identifier = + configuration.getString(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE); + LOCK.lock(); Review comment: I'm open about this. Actually, the ```initialize()``` will be called in the early beginning of a taskmanager (**before** other threads run), and this will not change in near future. Thus, it is safe for other thread to access. However, I admit that this is a strong contract and not good for maintainance. WDYT? @Myasuka ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java ########## @@ -18,26 +18,81 @@ package org.apache.flink.runtime.state.changelog; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.plugin.PluginManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; import java.util.Iterator; import java.util.ServiceLoader; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterators.concat; /** A thin wrapper around {@link PluginManager} to load {@link StateChangelogStorage}. */ @Internal public class StateChangelogStorageLoader { - private final PluginManager pluginManager; - public StateChangelogStorageLoader(PluginManager pluginManager) { - this.pluginManager = pluginManager; + private static final Logger LOG = LoggerFactory.getLogger(StateChangelogStorageLoader.class); + + /** Object used to protect calls to specific methods. */ + private static final ReentrantLock LOCK = new ReentrantLock(true); + + /** + * Mapping of state changelog storage identifier to the corresponding storage factories, + * populated in {@link StateChangelogStorageLoader#initialize(PluginManager)}. + */ + private static final HashMap<String, StateChangelogStorageFactory> + STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>(); + + static { + // Guarantee to trigger once. + initialize(null); + } + + public static void initialize(PluginManager pluginManager) { + LOCK.lock(); + try { + STATE_CHANGELOG_STORAGE_FACTORIES.clear(); + Iterator<StateChangelogStorageFactory> iterator = + pluginManager == null + ? ServiceLoader.load(StateChangelogStorageFactory.class).iterator() + : concat( + pluginManager.load(StateChangelogStorageFactory.class), + ServiceLoader.load(StateChangelogStorageFactory.class) + .iterator()); + iterator.forEachRemaining( + factory -> + STATE_CHANGELOG_STORAGE_FACTORIES.putIfAbsent( + factory.getIdentifier(), factory)); + LOG.info( + "StateChangelogStorageLoader initialized with shortcut names {{}}.", + String.join(",", STATE_CHANGELOG_STORAGE_FACTORIES.keySet())); + } finally { + LOCK.unlock(); + } } @SuppressWarnings({"rawtypes"}) - public Iterator<StateChangelogStorage> load() { - return concat( - pluginManager.load(StateChangelogStorage.class), - ServiceLoader.load(StateChangelogStorage.class).iterator()); + public static StateChangelogStorage load(Configuration configuration) { + final String identifier = + configuration.getString(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE); + LOCK.lock(); Review comment: I'm open about this. Actually, the ```initialize()``` will be called in the early beginning of a taskmanager (**before** other threads run), and this will not change in near future. Thus, it is even safe for other thread to access. However, I admit that this is a strong contract and not good for maintainance. WDYT? @Myasuka ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ########## @@ -669,13 +677,18 @@ private void stopTaskExecutorServices() throws Exception { taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); + final StateChangelogStorage<?> changelogStorage = + changelogStoragesManager.stateChangelogStorageForJob( + jobId, jobInformation.getJobConfiguration()); Review comment: I checked the code and IIUC, the configuration in ```taskInformation.getConfiguration()``` comes from the ```JobVertex.getConfiguration()``` and be set in ```TaskConfig```. It seems that no checkpoint related configs should be set here. As for the ```TaskExecutor.taskManagerConfiguration```, it is a configuration for the cluster. Since there is a default value for changelog storage, the job configuration will always override the one for cluster. I don't think we need any configuration overriding here. WDYT? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; + +/** + * A factory for {@link StateChangelogStorage}. Please use {@link StateChangelogStorageLoader} to + * create {@link StateChangelogStorage}. + */ +@Internal Review comment: @Myasuka I'll create a ticket to track this. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.changelog.StateChangelogStorage; +import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader; +import org.apache.flink.util.ShutdownHookUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; + +import java.util.HashMap; +import java.util.Map; + +/** This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). */ +public class TaskExecutorStateChangelogStoragesManager { + + /** Logger for this class. */ + private static final Logger LOG = + LoggerFactory.getLogger(TaskExecutorStateChangelogStoragesManager.class); + + /** + * This map holds all state changelog storages for tasks running on the task manager / executor + * that own the instance of this. Maps from job id to all the subtask's state changelog + * storages. + */ + @GuardedBy("lock") + private final Map<JobID, StateChangelogStorage<?>> changelogStoragesByJobId; + + /** Guarding lock for changelogStoragesByJobId and closed-flag. */ + private final Object lock; + + @GuardedBy("lock") + private boolean closed; + + /** shutdown hook for this manager. */ + private final Thread shutdownHook; + + public TaskExecutorStateChangelogStoragesManager() { + this.changelogStoragesByJobId = new HashMap<>(); + this.lock = new Object(); + this.closed = false; + + // register a shutdown hook + this.shutdownHook = + ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG); + } + + public StateChangelogStorage<?> stateChangelogStorageForJob( + @Nonnull JobID jobId, Configuration configuration) { + synchronized (lock) { + if (closed) { Review comment: I have removed this lock. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java ########## @@ -28,16 +34,51 @@ /** A thin wrapper around {@link PluginManager} to load {@link StateChangelogStorage}. */ @Internal public class StateChangelogStorageLoader { - private final PluginManager pluginManager; - public StateChangelogStorageLoader(PluginManager pluginManager) { - this.pluginManager = pluginManager; + private static final Logger LOG = LoggerFactory.getLogger(StateChangelogStorageLoader.class); + + /** + * Mapping of state changelog storage identifier to the corresponding storage factories, + * populated in {@link StateChangelogStorageLoader#initialize(PluginManager)}. + */ + private static final HashMap<String, StateChangelogStorageFactory> + STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>(); + + static { + // Guarantee to trigger once. + initialize(null); } - @SuppressWarnings({"rawtypes"}) - public Iterator<StateChangelogStorage> load() { - return concat( - pluginManager.load(StateChangelogStorage.class), - ServiceLoader.load(StateChangelogStorage.class).iterator()); + public static void initialize(PluginManager pluginManager) { + STATE_CHANGELOG_STORAGE_FACTORIES.clear(); + Iterator<StateChangelogStorageFactory> iterator = + pluginManager == null + ? ServiceLoader.load(StateChangelogStorageFactory.class).iterator() + : concat( + pluginManager.load(StateChangelogStorageFactory.class), + ServiceLoader.load(StateChangelogStorageFactory.class).iterator()); + iterator.forEachRemaining( + factory -> + STATE_CHANGELOG_STORAGE_FACTORIES.putIfAbsent( + factory.getIdentifier().toLowerCase(), factory)); Review comment: We do ```clear()``` before load and ```putIfAbsent```, so initializing twice won't cause duplicated factories. Duplicated factories occur when ```PluginManager``` produces several factories with same name or ```ServiceLoader``` loads one with same name. This may happen when users put several jars in ```lib```. Since JVM will load the first class from the first jar it finds, and the later ones with same identifiers are ignored, we should do the same. However, I do agree we remind user with a WARN message. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ########## @@ -669,13 +677,18 @@ private void stopTaskExecutorServices() throws Exception { taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); + final StateChangelogStorage<?> changelogStorage = + changelogStoragesManager.stateChangelogStorageForJob( + jobId, jobInformation.getJobConfiguration()); Review comment: I agree that we could add a datastream API for ```STATE_CHANGE_LOG_STORAGE```. We could do this later when we have several implementation of storage. However, I prefer make another API instead of ```enableChangelogStateBackend("memory")```. It's not user-friendly to make user know the storage shotcut name before using changelog, especially we may have only one storage for production in most cases. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ########## @@ -669,13 +677,18 @@ private void stopTaskExecutorServices() throws Exception { taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); + final StateChangelogStorage<?> changelogStorage = + changelogStoragesManager.stateChangelogStorageForJob( + jobId, jobInformation.getJobConfiguration()); Review comment: I agree that we could add a datastream API for ```STATE_CHANGE_LOG_STORAGE```. We could do this later when we have several implementation of storage. However, I prefer make another API instead of ```enableChangelogStateBackend("memory")```. It's not user-friendly to make user know the storage shortcut name before using changelog, especially we may have only one storage for production in most cases. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java ########## @@ -18,26 +18,81 @@ package org.apache.flink.runtime.state.changelog; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.plugin.PluginManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; import java.util.Iterator; import java.util.ServiceLoader; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.flink.shaded.guava18.com.google.common.collect.Iterators.concat; /** A thin wrapper around {@link PluginManager} to load {@link StateChangelogStorage}. */ @Internal public class StateChangelogStorageLoader { - private final PluginManager pluginManager; - public StateChangelogStorageLoader(PluginManager pluginManager) { - this.pluginManager = pluginManager; + private static final Logger LOG = LoggerFactory.getLogger(StateChangelogStorageLoader.class); + + /** Object used to protect calls to specific methods. */ + private static final ReentrantLock LOCK = new ReentrantLock(true); + + /** + * Mapping of state changelog storage identifier to the corresponding storage factories, + * populated in {@link StateChangelogStorageLoader#initialize(PluginManager)}. + */ + private static final HashMap<String, StateChangelogStorageFactory> + STATE_CHANGELOG_STORAGE_FACTORIES = new HashMap<>(); + + static { + // Guarantee to trigger once. + initialize(null); + } + + public static void initialize(PluginManager pluginManager) { + LOCK.lock(); + try { + STATE_CHANGELOG_STORAGE_FACTORIES.clear(); + Iterator<StateChangelogStorageFactory> iterator = + pluginManager == null + ? ServiceLoader.load(StateChangelogStorageFactory.class).iterator() + : concat( + pluginManager.load(StateChangelogStorageFactory.class), + ServiceLoader.load(StateChangelogStorageFactory.class) + .iterator()); + iterator.forEachRemaining( + factory -> + STATE_CHANGELOG_STORAGE_FACTORIES.putIfAbsent( + factory.getIdentifier(), factory)); + LOG.info( + "StateChangelogStorageLoader initialized with shortcut names {{}}.", + String.join(",", STATE_CHANGELOG_STORAGE_FACTORIES.keySet())); + } finally { + LOCK.unlock(); + } } @SuppressWarnings({"rawtypes"}) - public Iterator<StateChangelogStorage> load() { - return concat( - pluginManager.load(StateChangelogStorage.class), - ServiceLoader.load(StateChangelogStorage.class).iterator()); + public static StateChangelogStorage load(Configuration configuration) { + final String identifier = + configuration.getString(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE); + LOCK.lock(); Review comment: Initializing twice may happen when JVM reuse (eg. Hadoop or unit tests). So I prefer not to change here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ########## @@ -669,13 +677,18 @@ private void stopTaskExecutorServices() throws Exception { taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); + final StateChangelogStorage<?> changelogStorage = + changelogStoragesManager.stateChangelogStorageForJob( + jobId, jobInformation.getJobConfiguration()); Review comment: Yeah, this seems good. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ########## @@ -669,13 +677,18 @@ private void stopTaskExecutorServices() throws Exception { taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); + final StateChangelogStorage<?> changelogStorage = + changelogStoragesManager.stateChangelogStorageForJob( + jobId, jobInformation.getJobConfiguration()); Review comment: Oh I get it, this seems good. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ########## @@ -669,13 +677,18 @@ private void stopTaskExecutorServices() throws Exception { taskInformation.getJobVertexId(), tdd.getSubtaskIndex()); + final StateChangelogStorage<?> changelogStorage = + changelogStoragesManager.stateChangelogStorageForJob( + jobId, jobInformation.getJobConfiguration()); Review comment: According to offline sync, we: - currently reads value from taskmanager configuration - pass value from user program and do overriding here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org