    +package org.apache.flink.runtime.checkpoint;
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.state.CompositeStateHandle;
    +import org.apache.flink.runtime.state.SharedStateRegistry;
    +import org.apache.flink.runtime.state.StateUtil;
    +import org.apache.flink.util.Preconditions;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Set;
    + * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
    + * can run multiple operator instances as a result of operator chaining, 
and all operator instances from the chain can
    + * register their state under their operator id. Each operator instance is 
a physical execution responsible for
    + * processing a partition of the data that goes through a logical 
operator. This partitioning happens to parallelize
    + * execution of logical operators, e.g. distributing a map function.
    + *
    + * <p>One instance of this class contains the information that one task 
will send to acknowledge a checkpoint request by
    + * the checkpoint coordinator. Tasks run operator instances in parallel, 
so the union of all
    + * {@link TaskStateSnapshot} that are collected by the checkpoint 
coordinator from all tasks represent the whole
    + * state of a job at the time of the checkpoint.
    + *
    + * <p>This class should be called TaskState once the old class with this 
name that we keep for backwards
    + * compatibility goes away.
    + */
    +public class TaskStateSnapshot implements CompositeStateHandle {
    +   private static final long serialVersionUID = 1L;
    +   /** Mapping from an operator id to the state of one subtask of this 
operator */
    +   private final Map<OperatorID, OperatorSubtaskState> 
    Hmm, I think if we consider default load factors and for large sizes, I 
would pick a min >30% hit rate linear array scan over 100% hit rate random 
access iteration. For all expected sizes (in cache) in this class, it should 
not matter. LHM also consumes a bit more memory. I would tend to keep it this 

> Flink-5892 introduced the job manager / checkpoint coordinator part of 
> managing state on the operator level instead of the task level by introducing 
> explicit operator_id -> state mappings. However, this explicit mapping was 
> not introduced in the task manager side, so the explicit mapping is still 
> converted into a mapping that suits the implicit operator chain order.
> We should also introduce explicit operator ids to state management on the 
> task manager.

