[ https://issues.apache.org/jira/browse/FLINK-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16007773#comment-16007773 ]
ASF GitHub Bot commented on FLINK-6534: --------------------------------------- Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116172093 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java --- @@ -18,91 +18,137 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; /** * A {@code SharedStateRegistry} will be deployed in the - * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to * maintain the reference count of {@link SharedStateHandle}s which are shared - * among different checkpoints. - * + * among different incremental checkpoints. */ public class SharedStateRegistry { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); /** All registered state objects by an artificial key */ - private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates; + private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates; + + /** Executor for async state deletion */ + private final Executor asyncDisposalExecutor; public SharedStateRegistry() { this.registeredStates = new HashMap<>(); + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534 --- End diff -- I prefer not to use another asynchronous executor here. In my initial implementation of `SharedStateRegistry`, unreferenced shared states are not discarded immediately and are returned in a list. These unreferenced shared states then are discarded outside the synchronization scope. Given that the completed checkpoints are already discarded in an asynchronous thread in the `ZookeeperCompletedCheckpointStore` (which are more used in practice), we can avoid the usage of another asynchronous executor here. What do you think? > SharedStateRegistry is disposing state handles from main thread > --------------------------------------------------------------- > > Key: FLINK-6534 > URL: https://issues.apache.org/jira/browse/FLINK-6534 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Affects Versions: 1.3.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Blocker > Fix For: 1.3.0 > > > Currently, the {{ShareStateRegistry}} is deleting state handles that are no > longer referenced under the registry's lock and from the main thread. We > should use the {{CheckpointCoordinator}}'s async IO executor to make this > non-blocking. -- This message was sent by Atlassian JIRA (v6.3.15#6346)