StefanRRichter commented on a change in pull request #8611: 
[FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable

 File path: 
 @@ -219,985 +43,41 @@
                InternalKeyContext<K> keyContext,
                RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
                TypeSerializer<K> keySerializer) {
-               this(keyContext, metaInfo, DEFAULT_CAPACITY, keySerializer);
-       }
-       /**
-        * Constructs a new {@code StateTable} instance with the specified 
-        *
-        * @param keyContext    the key context.
-        * @param metaInfo      the meta information, including the type 
serializer for state copy-on-write.
-        * @param capacity      the initial capacity of this hash map.
-        * @param keySerializer the serializer of the key.
-        * @throws IllegalArgumentException when the capacity is less than zero.
-        */
-       @SuppressWarnings("unchecked")
-       private CopyOnWriteStateTable(
-               InternalKeyContext<K> keyContext,
-               RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
-               int capacity,
-               TypeSerializer<K> keySerializer) {
                super(keyContext, metaInfo, keySerializer);
-               // initialized tables to EMPTY_TABLE.
-               this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
-               this.incrementalRehashTable = (StateTableEntry<K, N, S>[]) 
-               // initialize sizes to 0.
-               this.primaryTableSize = 0;
-               this.incrementalRehashTableSize = 0;
-               this.rehashIndex = 0;
-               this.stateTableVersion = 0;
-               this.highestRequiredSnapshotVersion = 0;
-               this.snapshotVersions = new TreeSet<>();
-               if (capacity < 0) {
-                       throw new IllegalArgumentException("Capacity: " + 
-               }
-               if (capacity == 0) {
-                       threshold = -1;
-                       return;
-               }
-               if (capacity < MINIMUM_CAPACITY) {
-                       capacity = MINIMUM_CAPACITY;
-               } else if (capacity > MAXIMUM_CAPACITY) {
-                       capacity = MAXIMUM_CAPACITY;
-               } else {
-                       capacity = MathUtils.roundUpToPowerOfTwo(capacity);
-               }
-               primaryTable = makeTable(capacity);
-       // Public API from AbstractStateTable 
-       /**
-        * Returns the total number of entries in this {@link 
CopyOnWriteStateTable}. This is the sum of both sub-tables.
-        *
-        * @return the number of entries in this {@link CopyOnWriteStateTable}.
-        */
-       @Override
-       public int size() {
-               return primaryTableSize + incrementalRehashTableSize;
-       }
-       @Override
-       public S get(K key, N namespace) {
-               final int hash = 
computeHashForOperationAndDoIncrementalRehash(key, namespace);
-               final int requiredVersion = highestRequiredSnapshotVersion;
-               final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
-               int index = hash & (tab.length - 1);
-               for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = {
-                       final K eKey = e.key;
-                       final N eNamespace = e.namespace;
-                       if ((e.hash == hash && key.equals(eKey) && 
namespace.equals(eNamespace))) {
-                               // copy-on-write check for state
-                               if (e.stateVersion < requiredVersion) {
-                                       // copy-on-write check for entry
-                                       if (e.entryVersion < requiredVersion) {
-                                               e = 
handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
-                                       }
-                                       e.stateVersion = stateTableVersion;
-                                       e.state = 
-                               }
-                               return e.state;
-                       }
-               }
-               return null;
-       }
-       @Override
-       public Stream<K> getKeys(N namespace) {
-               return, false)
-                       .filter(entry -> entry.getNamespace().equals(namespace))
-                       .map(StateEntry::getKey);
-       }
-       @Override
-       public void put(K key, int keyGroup, N namespace, S state) {
-               put(key, namespace, state);
-       }
-       @Override
-       public S get(N namespace) {
-               return get(keyContext.getCurrentKey(), namespace);
-       }
-       @Override
-       public boolean containsKey(N namespace) {
-               return containsKey(keyContext.getCurrentKey(), namespace);
-       }
-       @Override
-       public void put(N namespace, S state) {
-               put(keyContext.getCurrentKey(), namespace, state);
-       }
-       @Override
-       public S putAndGetOld(N namespace, S state) {
-               return putAndGetOld(keyContext.getCurrentKey(), namespace, 
-       }
-       @Override
-       public void remove(N namespace) {
-               remove(keyContext.getCurrentKey(), namespace);
-       }
-       @Override
-       public S removeAndGetOld(N namespace) {
-               return removeAndGetOld(keyContext.getCurrentKey(), namespace);
-       }
-       @Override
-       public <T> void transform(N namespace, T value, 
StateTransformationFunction<S, T> transformation) throws Exception {
-               transform(keyContext.getCurrentKey(), namespace, value, 
-       }
-       // Private implementation details of the API methods 
-       /**
-        * Returns whether this table contains the specified key/namespace 
composite key.
-        *
-        * @param key       the key in the composite key to search for. Not 
-        * @param namespace the namespace in the composite key to search for. 
Not null.
-        * @return {@code true} if this map contains the specified 
key/namespace composite key,
-        * {@code false} otherwise.
-        */
-       boolean containsKey(K key, N namespace) {
-               final int hash = 
computeHashForOperationAndDoIncrementalRehash(key, namespace);
-               final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
-               int index = hash & (tab.length - 1);
-               for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = {
-                       final K eKey = e.key;
-                       final N eNamespace = e.namespace;
-                       if ((e.hash == hash && key.equals(eKey) && 
namespace.equals(eNamespace))) {
-                               return true;
-                       }
-               }
-               return false;
-       }
-       /**
-        * Maps the specified key/namespace composite key to the specified 
value. This method should be preferred
-        * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)} 
when the caller is not interested
-        * in the old value, because this can potentially reduce copy-on-write 
-        *
-        * @param key       the key. Not null.
-        * @param namespace the namespace. Not null.
-        * @param value     the value. Can be null.
-        */
-       void put(K key, N namespace, S value) {
-               final StateTableEntry<K, N, S> e = putEntry(key, namespace);
-               e.state = value;
-               e.stateVersion = stateTableVersion;
-       }
-       /**
-        * Maps the specified key/namespace composite key to the specified 
value. Returns the previous state that was
-        * registered under the composite key.
-        *
-        * @param key       the key. Not null.
-        * @param namespace the namespace. Not null.
-        * @param value     the value. Can be null.
-        * @return the value of any previous mapping with the specified key or
-        * {@code null} if there was no such mapping.
-        */
-       S putAndGetOld(K key, N namespace, S value) {
-               final StateTableEntry<K, N, S> e = putEntry(key, namespace);
-               // copy-on-write check for state
-               S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ?
-                               getStateSerializer().copy(e.state) :
-                               e.state;
-               e.state = value;
-               e.stateVersion = stateTableVersion;
-               return oldState;
-       }
-       /**
-        * Removes the mapping with the specified key/namespace composite key 
from this map. This method should be preferred
-        * over {@link #removeAndGetOld(Object, Object)} when the caller is not 
interested in the old value, because this
-        * can potentially reduce copy-on-write activity.
-        *
-        * @param key       the key of the mapping to remove. Not null.
-        * @param namespace the namespace of the mapping to remove. Not null.
-        */
-       void remove(K key, N namespace) {
-               removeEntry(key, namespace);
-       }
-       /**
-        * Removes the mapping with the specified key/namespace composite key 
from this map, returning the state that was
-        * found under the entry.
-        *
-        * @param key       the key of the mapping to remove. Not null.
-        * @param namespace the namespace of the mapping to remove. Not null.
-        * @return the value of the removed mapping or {@code null} if no 
-        * for the specified key was found.
-        */
-       S removeAndGetOld(K key, N namespace) {
-               final StateTableEntry<K, N, S> e = removeEntry(key, namespace);
-               return e != null ?
-                               // copy-on-write check for state
-                               (e.stateVersion < 
highestRequiredSnapshotVersion ?
getStateSerializer().copy(e.state) :
-                                               e.state) :
-                               null;
-       }
-       /**
-        * @param key            the key of the mapping to remove. Not null.
-        * @param namespace      the namespace of the mapping to remove. Not 
-        * @param value          the value that is the second input for the 
-        * @param transformation the transformation function to apply on the 
old state and the given value.
-        * @param <T>            type of the value that is the second input to 
the {@link StateTransformationFunction}.
-        * @throws Exception exception that happen on applying the function.
-        * @see #transform(Object, Object, StateTransformationFunction).
-        */
-       <T> void transform(
-                       K key,
-                       N namespace,
-                       T value,
-                       StateTransformationFunction<S, T> transformation) 
throws Exception {
-               final StateTableEntry<K, N, S> entry = putEntry(key, namespace);
-               // copy-on-write check for state
-               entry.state = transformation.apply(
-                               (entry.stateVersion < 
highestRequiredSnapshotVersion) ?
getStateSerializer().copy(entry.state) :
-                                               entry.state,
-                               value);
-               entry.stateVersion = stateTableVersion;
-       }
-       /**
-        * Helper method that is the basis for operations that add mappings.
-        */
-       private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
-               final int hash = 
computeHashForOperationAndDoIncrementalRehash(key, namespace);
-               final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
-               int index = hash & (tab.length - 1);
-               for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = {
-                       if (e.hash == hash && key.equals(e.key) && 
namespace.equals(e.namespace)) {
-                               // copy-on-write check for entry
-                               if (e.entryVersion < 
highestRequiredSnapshotVersion) {
-                                       e = handleChainedEntryCopyOnWrite(tab, 
index, e);
-                               }
-                               return e;
-                       }
-               }
-               ++modCount;
-               if (size() > threshold) {
-                       doubleCapacity();
-               }
-               return addNewStateTableEntry(tab, key, namespace, hash);
-       }
-       /**
-        * Helper method that is the basis for operations that remove mappings.
-        */
-       private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
-               final int hash = 
computeHashForOperationAndDoIncrementalRehash(key, namespace);
-               final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
-               int index = hash & (tab.length - 1);
-               for (StateTableEntry<K, N, S> e = tab[index], prev = null; e != 
null; prev = e, e = {
-                       if (e.hash == hash && key.equals(e.key) && 
namespace.equals(e.namespace)) {
-                               if (prev == null) {
-                                       tab[index] =;
-                               } else {
-                                       // copy-on-write check for entry
-                                       if (prev.entryVersion < 
highestRequiredSnapshotVersion) {
-                                               prev = 
handleChainedEntryCopyOnWrite(tab, index, prev);
-                                       }
-                              =;
-                               }
-                               ++modCount;
-                               if (tab == primaryTable) {
-                                       --primaryTableSize;
-                               } else {
-                                       --incrementalRehashTableSize;
-                               }
-                               return e;
-                       }
-               }
-               return null;
-       }
-       private void checkKeyNamespacePreconditions(K key, N namespace) {
-               Preconditions.checkNotNull(key, "No key set. This method should 
not be called outside of a keyed context.");
-               Preconditions.checkNotNull(namespace, "Provided namespace is 
-       }
-       // Meta data setter / getter and toString 
-       @Override
-       public TypeSerializer<S> getStateSerializer() {
-               return metaInfo.getStateSerializer();
-       }
-       @Override
-       public TypeSerializer<N> getNamespaceSerializer() {
-               return metaInfo.getNamespaceSerializer();
-       }
-       @Override
-       public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
-               return metaInfo;
-       }
-       @Override
-       public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> 
metaInfo) {
-               this.metaInfo = metaInfo;
-       }
-       // Iteration  
-       @Nonnull
-       public Iterator<StateEntry<K, N, S>> iterator() {
-               return new StateEntryIterator();
-       }
-       // Private utility functions for StateTable management 
-       /**
-        * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot)
-        */
-       @VisibleForTesting
-       void releaseSnapshot(int snapshotVersion) {
-               // we guard against concurrent modifications of 
highestRequiredSnapshotVersion between snapshot and release.
-               // Only stale reads of from the result of #releaseSnapshot 
calls are ok.
-               synchronized (snapshotVersions) {
Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to 
release unknown snapshot version");
-                       highestRequiredSnapshotVersion = 
snapshotVersions.isEmpty() ? 0 : snapshotVersions.last();
-               }
-       }
-       /**
-        * Creates (combined) copy of the table arrays for a snapshot. This 
method must be called by the same Thread that
-        * does modifications to the {@link CopyOnWriteStateTable}.
-        */
-       @VisibleForTesting
-       @SuppressWarnings("unchecked")
-       StateTableEntry<K, N, S>[] snapshotTableArrays() {
-               // we guard against concurrent modifications of 
highestRequiredSnapshotVersion between snapshot and release.
-               // Only stale reads of from the result of #releaseSnapshot 
calls are ok. This is why we must call this method
-               // from the same thread that does all the modifications to the 
-               synchronized (snapshotVersions) {
-                       // increase the table version for copy-on-write and 
register the snapshot
-                       if (++stateTableVersion < 0) {
-                               // this is just a safety net against overflows, 
but should never happen in practice (i.e., only after 2^31 snapshots)
-                               throw new IllegalStateException("Version count 
overflow in CopyOnWriteStateTable. Enforcing restart.");
-                       }
-                       highestRequiredSnapshotVersion = stateTableVersion;
-                       snapshotVersions.add(highestRequiredSnapshotVersion);
-               }
-               StateTableEntry<K, N, S>[] table = primaryTable;
-               // In order to reuse the copied array as the destination array 
for the partitioned records in
-               // CopyOnWriteStateTableSnapshot#partitionByKeyGroup(), we need 
to make sure that the copied array
-               // is big enough to hold the flattened entries. In fact, given 
the current rehashing algorithm, we only
-               // need to do this check when isRehashing() is false, but in 
order to get a more robust code(in case that
-               // the rehashing algorithm may changed in the future), we do 
this check for all the case.
-               final int totalTableIndexSize = rehashIndex + table.length;
-               final int copiedArraySize = Math.max(totalTableIndexSize, 
-               final StateTableEntry<K, N, S>[] copy = new 
-               if (isRehashing()) {
-                       // consider both tables for the snapshot, the rehash 
index tells us which part of the two tables we need
-                       final int localRehashIndex = rehashIndex;
-                       final int localCopyLength = table.length - 
-                       // for the primary table, take every index >= rhIdx.
-                       System.arraycopy(table, localRehashIndex, copy, 0, 
-                       // for the new table, we are sure that two regions 
contain all the entries:
-                       // [0, rhIdx[ AND [table.length / 2, table.length / 2 + 
-                       table = incrementalRehashTable;
-                       System.arraycopy(table, 0, copy, localCopyLength, 
-                       System.arraycopy(table, table.length >>> 1, copy, 
localCopyLength + localRehashIndex, localRehashIndex);
-               } else {
-                       // we only need to copy the primary table
-                       System.arraycopy(table, 0, copy, 0, table.length);
-               }
-               return copy;
-       }
-       /**
-        * Allocate a table of the given capacity and set the threshold 
-        *
-        * @param newCapacity must be a power of two
-        */
-       private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
-               if (newCapacity < MAXIMUM_CAPACITY) {
-                       threshold = (newCapacity >> 1) + (newCapacity >> 2); // 
3/4 capacity
-               } else {
-                       if (size() > MAX_ARRAY_SIZE) {
-                               throw new IllegalStateException("Maximum 
capacity of CopyOnWriteStateTable is reached and the job " +
-                                       "cannot continue. Please consider 
scaling-out your job or using a different keyed state backend " +
-                                       "implementation!");
-                       } else {
-                               LOG.warn("Maximum capacity of 2^30 in 
StateTable reached. Cannot increase hash table size. This can " +
-                                       "lead to more collisions and lower 
performance. Please consider scaling-out your job or using a " +
-                                       "different keyed state backend 
-                               threshold = MAX_ARRAY_SIZE;
-                       }
-               }
-               @SuppressWarnings("unchecked") StateTableEntry<K, N, S>[] 
-                               = (StateTableEntry<K, N, S>[]) new 
-               return newTable;
-       }
-       /**
-        * Creates and inserts a new {@link StateTableEntry}.
-        */
-       private StateTableEntry<K, N, S> addNewStateTableEntry(
-                       StateTableEntry<K, N, S>[] table,
-                       K key,
-                       N namespace,
-                       int hash) {
-               // small optimization that aims to avoid holding references on 
duplicate namespace objects
-               if (namespace.equals(lastNamespace)) {
-                       namespace = lastNamespace;
-               } else {
-                       lastNamespace = namespace;
-               }
-               int index = hash & (table.length - 1);
-               StateTableEntry<K, N, S> newEntry = new StateTableEntry<>(
-                               key,
-                               namespace,
-                               null,
-                               hash,
-                               table[index],
-                               stateTableVersion,
-                               stateTableVersion);
-               table[index] = newEntry;
-               if (table == primaryTable) {
-                       ++primaryTableSize;
-               } else {
-                       ++incrementalRehashTableSize;
-               }
-               return newEntry;
-       }
-       /**
-        * Select the sub-table which is responsible for entries with the given 
hash code.
-        *
-        * @param hashCode the hash code which we use to decide about the table 
that is responsible.
-        * @return the index of the sub-table that is responsible for the entry 
with the given hash code.
-        */
-       private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) {
-               return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? 
primaryTable : incrementalRehashTable;
-       }
-       /**
-        * Doubles the capacity of the hash table. Existing entries are placed 
-        * the correct bucket on the enlarged table. If the current capacity is,
-        * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which
-        * will be new unless we were already at MAXIMUM_CAPACITY.
-        */
-       private void doubleCapacity() {
-               // There can only be one rehash in flight. From the amount of 
incremental rehash steps we take, this should always hold.
-               Preconditions.checkState(!isRehashing(), "There is already a 
rehash in progress.");
-               StateTableEntry<K, N, S>[] oldTable = primaryTable;
-               int oldCapacity = oldTable.length;
-               if (oldCapacity == MAXIMUM_CAPACITY) {
-                       return;
-               }
-               incrementalRehashTable = makeTable(oldCapacity * 2);
-       }
-       /**
-        * Returns true, if an incremental rehash is in progress.
-        */
-       @VisibleForTesting
-       boolean isRehashing() {
-               // if we rehash, the secondary table is not empty
-               return EMPTY_TABLE != incrementalRehashTable;
-       }
-       /**
-        * Computes the hash for the composite of key and namespace and 
performs some steps of incremental rehash if
-        * incremental rehashing is in progress.
-        */
-       private int computeHashForOperationAndDoIncrementalRehash(K key, N 
namespace) {
-               checkKeyNamespacePreconditions(key, namespace);
-               if (isRehashing()) {
-                       incrementalRehash();
-               }
-               return compositeHash(key, namespace);
-       }
-       /**
-        * Runs a number of steps for incremental rehashing.
-        */
-       @SuppressWarnings("unchecked")
-       private void incrementalRehash() {
-               StateTableEntry<K, N, S>[] oldTable = primaryTable;
-               StateTableEntry<K, N, S>[] newTable = incrementalRehashTable;
-               int oldCapacity = oldTable.length;
-               int newMask = newTable.length - 1;
-               int requiredVersion = highestRequiredSnapshotVersion;
-               int rhIdx = rehashIndex;
-               int transferred = 0;
-               // we migrate a certain minimum amount of entries from the old 
to the new table
-               while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
-                       StateTableEntry<K, N, S> e = oldTable[rhIdx];
-                       while (e != null) {
-                               // copy-on-write check for entry
-                               if (e.entryVersion < requiredVersion) {
-                                       e = new StateTableEntry<>(e, 
-                               }
-                               StateTableEntry<K, N, S> n =;
-                               int pos = e.hash & newMask;
-                      = newTable[pos];
-                               newTable[pos] = e;
-                               e = n;
-                               ++transferred;
-                       }
-                       oldTable[rhIdx] = null;
-                       if (++rhIdx == oldCapacity) {
-                               //here, the rehash is complete and we release 
resources and reset fields
-                               primaryTable = newTable;
-                               incrementalRehashTable = (StateTableEntry<K, N, 
-                               primaryTableSize += incrementalRehashTableSize;
-                               incrementalRehashTableSize = 0;
-                               rehashIndex = 0;
-                               return;
-                       }
-               }
-               // sync our local bookkeeping the with official bookkeeping 
-               primaryTableSize -= transferred;
-               incrementalRehashTableSize += transferred;
-               rehashIndex = rhIdx;
-       }
-       /**
-        * Perform copy-on-write for entry chains. We iterate the (hopefully 
and probably) still cached chain, replace
-        * all links up to the 'untilEntry', which we actually wanted to modify.
-        */
-       private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(
-                       StateTableEntry<K, N, S>[] tab,
-                       int tableIdx,
-                       StateTableEntry<K, N, S> untilEntry) {
-               final int required = highestRequiredSnapshotVersion;
-               StateTableEntry<K, N, S> current = tab[tableIdx];
-               StateTableEntry<K, N, S> copy;
-               if (current.entryVersion < required) {
-                       copy = new StateTableEntry<>(current, 
-                       tab[tableIdx] = copy;
-               } else {
-                       // nothing to do, just advance copy to current
-                       copy = current;
-               }
-               // we iterate the chain up to 'until entry'
-               while (current != untilEntry) {
-                       //advance current
-                       current =;
-                       if (current.entryVersion < required) {
-                               // copy and advance the current's copy
-                      = new StateTableEntry<>(current, 
-                               copy =;
-                       } else {
-                               // nothing to do, just advance copy to current
-                               copy = current;
-                       }
-               }
-               return copy;
-       }
-       @SuppressWarnings("unchecked")
-       private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
-               return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY;
-       }
-       /**
-        * Helper function that creates and scrambles a composite hash for key 
and namespace.
-        */
-       private static int compositeHash(Object key, Object namespace) {
-               // create composite key through XOR, then apply some bit-mixing 
for better distribution of skewed keys.
-               return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode());
+       protected CopyOnWriteStateMap<K, N, S> createStateMap() {
+               return new CopyOnWriteStateMap<>(getStateSerializer());
        // Snapshotting 
-       int getStateTableVersion() {
-               return stateTableVersion;
-       }
-        * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be 
written in checkpointing. The snapshot integrity
-        * is protected through copy-on-write from the {@link 
CopyOnWriteStateTable}. Users should call
-        * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using 
the returned object.
+        * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be 
written in checkpointing.
         * @return a snapshot from this {@link CopyOnWriteStateTable}, for 
        public CopyOnWriteStateTableSnapshot<K, N, S> stateSnapshot() {
-               return new CopyOnWriteStateTableSnapshot<>(this);
+               return new CopyOnWriteStateTableSnapshot<>(
+                       this,
+                       getKeySerializer().duplicate(),
+                       getNamespaceSerializer().duplicate(),
+                       getStateSerializer().duplicate(),
-       /**
-        * Releases a snapshot for this {@link CopyOnWriteStateTable}. This 
method should be called once a snapshot is no more needed,
-        * so that the {@link CopyOnWriteStateTable} can stop considering this 
snapshot for copy-on-write, thus avoiding unnecessary
-        * object creation.
-        *
-        * @param snapshotToRelease the snapshot to release, which was 
previously created by this state table.
-        */
-       void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> 
snapshotToRelease) {
-               Preconditions.checkArgument(snapshotToRelease.isOwner(this),
-                               "Cannot release snapshot which is owned by a 
different state table.");
-               releaseSnapshot(snapshotToRelease.getSnapshotVersion());
-       }
-       // StateTableEntry 
-       /**
-        * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of 
key, namespace, and state. Thereby, key and
-        * namespace together serve as a composite key for the state. This 
class also contains some management meta data for
-        * copy-on-write, a pointer to link other {@link StateTableEntry}s to a 
list, and cached hash code.
-        *
-        * @param <K> type of key.
-        * @param <N> type of namespace.
-        * @param <S> type of state.
-        */
-       @VisibleForTesting
-       protected static class StateTableEntry<K, N, S> implements 
StateEntry<K, N, S> {
-               /**
-                * The key. Assumed to be immutable and not null.
-                */
-               @Nonnull
-               final K key;
-               /**
-                * The namespace. Assumed to be immutable and not null.
-                */
-               @Nonnull
-               final N namespace;
-               /**
-                * The state. This is not final to allow exchanging the object 
for copy-on-write. Can be null.
-                */
-               @Nullable
-               S state;
-               /**
-                * Link to another {@link StateTableEntry}. This is used to 
resolve collisions in the
-                * {@link CopyOnWriteStateTable} through chaining.
-                */
-               @Nullable
-               StateTableEntry<K, N, S> next;
-               /**
-                * The version of this {@link StateTableEntry}. This is meta 
data for copy-on-write of the table structure.
-                */
-               int entryVersion;
-               /**
-                * The version of the state object in this entry. This is meta 
data for copy-on-write of the state object itself.
-                */
-               int stateVersion;
-               /**
-                * The computed secondary hash for the composite of key and 
-                */
-               final int hash;
-               StateTableEntry(StateTableEntry<K, N, S> other, int 
entryVersion) {
-                       this(other.key, other.namespace, other.state, 
other.hash,, entryVersion, other.stateVersion);
-               }
-               StateTableEntry(
-                       @Nonnull K key,
-                       @Nonnull N namespace,
-                       @Nullable S state,
-                       int hash,
-                       @Nullable StateTableEntry<K, N, S> next,
-                       int entryVersion,
-                       int stateVersion) {
-                       this.key = key;
-                       this.namespace = namespace;
-                       this.hash = hash;
-              = next;
-                       this.entryVersion = entryVersion;
-                       this.state = state;
-                       this.stateVersion = stateVersion;
-               }
-               public final void setState(@Nullable S value, int mapVersion) {
-                       // naturally, we can update the state version every 
time we replace the old state with a different object
-                       if (value != state) {
-                               this.state = value;
-                               this.stateVersion = mapVersion;
+       CopyOnWriteStateMapSnapshot<K, N, S>[] getStateMapSnapshotArray() {
+               CopyOnWriteStateMapSnapshot<K, N, S>[] snapshotArray =
+                       new CopyOnWriteStateMapSnapshot[state.length];
 Review comment:
   I would suggest to suppress the warning for unchecked assignment here or, as 
this place is not really performance critical, rather just use array list 
instead of array.

