Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3870#discussion_r116172093
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
    @@ -18,91 +18,137 @@
     
     package org.apache.flink.runtime.state;
     
    +import org.apache.flink.runtime.concurrent.Executors;
     import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.HashMap;
     import java.util.Map;
    +import java.util.Objects;
    +import java.util.concurrent.Executor;
     
     /**
      * A {@code SharedStateRegistry} will be deployed in the 
    - * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
    + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
      * maintain the reference count of {@link SharedStateHandle}s which are 
shared
    - * among different checkpoints.
    - *
    + * among different incremental checkpoints.
      */
     public class SharedStateRegistry {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(SharedStateRegistry.class);
     
        /** All registered state objects by an artificial key */
    -   private final Map<String, SharedStateRegistry.SharedStateEntry> 
registeredStates;
    +   private final Map<SharedStateRegistryKey, 
SharedStateRegistry.SharedStateEntry> registeredStates;
    +
    +   /** Executor for async state deletion */
    +   private final Executor asyncDisposalExecutor;
     
        public SharedStateRegistry() {
                this.registeredStates = new HashMap<>();
    +           this.asyncDisposalExecutor = Executors.directExecutor(); 
//TODO: FLINK-6534
    --- End diff --
    
    I prefer not to use another asynchronous executor here.
    
    In my initial implementation of `SharedStateRegistry`, unreferenced shared 
states are not discarded immediately and are returned in a list. These 
unreferenced shared states then are discarded outside the synchronization 
scope. Given that the completed checkpoints are already discarded in an 
asynchronous thread in the `ZookeeperCompletedCheckpointStore` (which are more 
used in practice), we can avoid the usage of another asynchronous executor 
here. 
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to