StefanRRichter commented on a change in pull request #8611: 
[FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable
URL: https://github.com/apache/flink/pull/8611#discussion_r293555221
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
 ##########
 @@ -219,985 +43,41 @@
                InternalKeyContext<K> keyContext,
                RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
                TypeSerializer<K> keySerializer) {
-               this(keyContext, metaInfo, DEFAULT_CAPACITY, keySerializer);
-       }
-
-       /**
-        * Constructs a new {@code StateTable} instance with the specified 
capacity.
-        *
-        * @param keyContext    the key context.
-        * @param metaInfo      the meta information, including the type 
serializer for state copy-on-write.
-        * @param capacity      the initial capacity of this hash map.
-        * @param keySerializer the serializer of the key.
-        * @throws IllegalArgumentException when the capacity is less than zero.
-        */
-       @SuppressWarnings("unchecked")
-       private CopyOnWriteStateTable(
-               InternalKeyContext<K> keyContext,
-               RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,
-               int capacity,
-               TypeSerializer<K> keySerializer) {
                super(keyContext, metaInfo, keySerializer);
-
-               // initialized tables to EMPTY_TABLE.
-               this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE;
-               this.incrementalRehashTable = (StateTableEntry<K, N, S>[]) 
EMPTY_TABLE;
-
-               // initialize sizes to 0.
-               this.primaryTableSize = 0;
-               this.incrementalRehashTableSize = 0;
-
-               this.rehashIndex = 0;
-               this.stateTableVersion = 0;
-               this.highestRequiredSnapshotVersion = 0;
-               this.snapshotVersions = new TreeSet<>();
-
-               if (capacity < 0) {
-                       throw new IllegalArgumentException("Capacity: " + 
capacity);
-               }
-
-               if (capacity == 0) {
-                       threshold = -1;
-                       return;
-               }
-
-               if (capacity < MINIMUM_CAPACITY) {
-                       capacity = MINIMUM_CAPACITY;
-               } else if (capacity > MAXIMUM_CAPACITY) {
-                       capacity = MAXIMUM_CAPACITY;
-               } else {
-                       capacity = MathUtils.roundUpToPowerOfTwo(capacity);
-               }
-               primaryTable = makeTable(capacity);
        }
 
-       // Public API from AbstractStateTable 
------------------------------------------------------------------------------
-
-       /**
-        * Returns the total number of entries in this {@link 
CopyOnWriteStateTable}. This is the sum of both sub-tables.
-        *
-        * @return the number of entries in this {@link CopyOnWriteStateTable}.
-        */
-       @Override
-       public int size() {
-               return primaryTableSize + incrementalRehashTableSize;
-       }
-
-       @Override
-       public S get(K key, N namespace) {
-
-               final int hash = 
computeHashForOperationAndDoIncrementalRehash(key, namespace);
-               final int requiredVersion = highestRequiredSnapshotVersion;
-               final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
-               int index = hash & (tab.length - 1);
-
-               for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = 
e.next) {
-                       final K eKey = e.key;
-                       final N eNamespace = e.namespace;
-                       if ((e.hash == hash && key.equals(eKey) && 
namespace.equals(eNamespace))) {
-
-                               // copy-on-write check for state
-                               if (e.stateVersion < requiredVersion) {
-                                       // copy-on-write check for entry
-                                       if (e.entryVersion < requiredVersion) {
-                                               e = 
handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
-                                       }
-                                       e.stateVersion = stateTableVersion;
-                                       e.state = 
getStateSerializer().copy(e.state);
-                               }
-
-                               return e.state;
-                       }
-               }
-
-               return null;
-       }
-
-       @Override
-       public Stream<K> getKeys(N namespace) {
-               return StreamSupport.stream(spliterator(), false)
-                       .filter(entry -> entry.getNamespace().equals(namespace))
-                       .map(StateEntry::getKey);
-       }
-
-       @Override
-       public void put(K key, int keyGroup, N namespace, S state) {
-               put(key, namespace, state);
-       }
-
-       @Override
-       public S get(N namespace) {
-               return get(keyContext.getCurrentKey(), namespace);
-       }
-
-       @Override
-       public boolean containsKey(N namespace) {
-               return containsKey(keyContext.getCurrentKey(), namespace);
-       }
-
-       @Override
-       public void put(N namespace, S state) {
-               put(keyContext.getCurrentKey(), namespace, state);
-       }
-
-       @Override
-       public S putAndGetOld(N namespace, S state) {
-               return putAndGetOld(keyContext.getCurrentKey(), namespace, 
state);
-       }
-
-       @Override
-       public void remove(N namespace) {
-               remove(keyContext.getCurrentKey(), namespace);
-       }
-
-       @Override
-       public S removeAndGetOld(N namespace) {
-               return removeAndGetOld(keyContext.getCurrentKey(), namespace);
-       }
-
-       @Override
-       public <T> void transform(N namespace, T value, 
StateTransformationFunction<S, T> transformation) throws Exception {
-               transform(keyContext.getCurrentKey(), namespace, value, 
transformation);
-       }
-
-       // Private implementation details of the API methods 
---------------------------------------------------------------
-
-       /**
-        * Returns whether this table contains the specified key/namespace 
composite key.
-        *
-        * @param key       the key in the composite key to search for. Not 
null.
-        * @param namespace the namespace in the composite key to search for. 
Not null.
-        * @return {@code true} if this map contains the specified 
key/namespace composite key,
-        * {@code false} otherwise.
-        */
-       boolean containsKey(K key, N namespace) {
-
-               final int hash = 
computeHashForOperationAndDoIncrementalRehash(key, namespace);
-               final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
-               int index = hash & (tab.length - 1);
-
-               for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = 
e.next) {
-                       final K eKey = e.key;
-                       final N eNamespace = e.namespace;
-
-                       if ((e.hash == hash && key.equals(eKey) && 
namespace.equals(eNamespace))) {
-                               return true;
-                       }
-               }
-               return false;
-       }
-
-       /**
-        * Maps the specified key/namespace composite key to the specified 
value. This method should be preferred
-        * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)} 
when the caller is not interested
-        * in the old value, because this can potentially reduce copy-on-write 
activity.
-        *
-        * @param key       the key. Not null.
-        * @param namespace the namespace. Not null.
-        * @param value     the value. Can be null.
-        */
-       void put(K key, N namespace, S value) {
-               final StateTableEntry<K, N, S> e = putEntry(key, namespace);
-
-               e.state = value;
-               e.stateVersion = stateTableVersion;
-       }
-
-       /**
-        * Maps the specified key/namespace composite key to the specified 
value. Returns the previous state that was
-        * registered under the composite key.
-        *
-        * @param key       the key. Not null.
-        * @param namespace the namespace. Not null.
-        * @param value     the value. Can be null.
-        * @return the value of any previous mapping with the specified key or
-        * {@code null} if there was no such mapping.
-        */
-       S putAndGetOld(K key, N namespace, S value) {
-
-               final StateTableEntry<K, N, S> e = putEntry(key, namespace);
-
-               // copy-on-write check for state
-               S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ?
-                               getStateSerializer().copy(e.state) :
-                               e.state;
-
-               e.state = value;
-               e.stateVersion = stateTableVersion;
-
-               return oldState;
-       }
-
-       /**
-        * Removes the mapping with the specified key/namespace composite key 
from this map. This method should be preferred
-        * over {@link #removeAndGetOld(Object, Object)} when the caller is not 
interested in the old value, because this
-        * can potentially reduce copy-on-write activity.
-        *
-        * @param key       the key of the mapping to remove. Not null.
-        * @param namespace the namespace of the mapping to remove. Not null.
-        */
-       void remove(K key, N namespace) {
-               removeEntry(key, namespace);
-       }
-
-       /**
-        * Removes the mapping with the specified key/namespace composite key 
from this map, returning the state that was
-        * found under the entry.
-        *
-        * @param key       the key of the mapping to remove. Not null.
-        * @param namespace the namespace of the mapping to remove. Not null.
-        * @return the value of the removed mapping or {@code null} if no 
mapping
-        * for the specified key was found.
-        */
-       S removeAndGetOld(K key, N namespace) {
-
-               final StateTableEntry<K, N, S> e = removeEntry(key, namespace);
-
-               return e != null ?
-                               // copy-on-write check for state
-                               (e.stateVersion < 
highestRequiredSnapshotVersion ?
-                                               
getStateSerializer().copy(e.state) :
-                                               e.state) :
-                               null;
-       }
-
-       /**
-        * @param key            the key of the mapping to remove. Not null.
-        * @param namespace      the namespace of the mapping to remove. Not 
null.
-        * @param value          the value that is the second input for the 
transformation.
-        * @param transformation the transformation function to apply on the 
old state and the given value.
-        * @param <T>            type of the value that is the second input to 
the {@link StateTransformationFunction}.
-        * @throws Exception exception that happen on applying the function.
-        * @see #transform(Object, Object, StateTransformationFunction).
-        */
-       <T> void transform(
-                       K key,
-                       N namespace,
-                       T value,
-                       StateTransformationFunction<S, T> transformation) 
throws Exception {
-
-               final StateTableEntry<K, N, S> entry = putEntry(key, namespace);
-
-               // copy-on-write check for state
-               entry.state = transformation.apply(
-                               (entry.stateVersion < 
highestRequiredSnapshotVersion) ?
-                                               
getStateSerializer().copy(entry.state) :
-                                               entry.state,
-                               value);
-               entry.stateVersion = stateTableVersion;
-       }
-
-       /**
-        * Helper method that is the basis for operations that add mappings.
-        */
-       private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
-
-               final int hash = 
computeHashForOperationAndDoIncrementalRehash(key, namespace);
-               final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
-               int index = hash & (tab.length - 1);
-
-               for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = 
e.next) {
-                       if (e.hash == hash && key.equals(e.key) && 
namespace.equals(e.namespace)) {
-
-                               // copy-on-write check for entry
-                               if (e.entryVersion < 
highestRequiredSnapshotVersion) {
-                                       e = handleChainedEntryCopyOnWrite(tab, 
index, e);
-                               }
-
-                               return e;
-                       }
-               }
-
-               ++modCount;
-               if (size() > threshold) {
-                       doubleCapacity();
-               }
-
-               return addNewStateTableEntry(tab, key, namespace, hash);
-       }
-
-       /**
-        * Helper method that is the basis for operations that remove mappings.
-        */
-       private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
-
-               final int hash = 
computeHashForOperationAndDoIncrementalRehash(key, namespace);
-               final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash);
-               int index = hash & (tab.length - 1);
-
-               for (StateTableEntry<K, N, S> e = tab[index], prev = null; e != 
null; prev = e, e = e.next) {
-                       if (e.hash == hash && key.equals(e.key) && 
namespace.equals(e.namespace)) {
-                               if (prev == null) {
-                                       tab[index] = e.next;
-                               } else {
-                                       // copy-on-write check for entry
-                                       if (prev.entryVersion < 
highestRequiredSnapshotVersion) {
-                                               prev = 
handleChainedEntryCopyOnWrite(tab, index, prev);
-                                       }
-                                       prev.next = e.next;
-                               }
-                               ++modCount;
-                               if (tab == primaryTable) {
-                                       --primaryTableSize;
-                               } else {
-                                       --incrementalRehashTableSize;
-                               }
-                               return e;
-                       }
-               }
-               return null;
-       }
-
-       private void checkKeyNamespacePreconditions(K key, N namespace) {
-               Preconditions.checkNotNull(key, "No key set. This method should 
not be called outside of a keyed context.");
-               Preconditions.checkNotNull(namespace, "Provided namespace is 
null.");
-       }
-
-       // Meta data setter / getter and toString 
--------------------------------------------------------------------------
-
-       @Override
-       public TypeSerializer<S> getStateSerializer() {
-               return metaInfo.getStateSerializer();
-       }
-
-       @Override
-       public TypeSerializer<N> getNamespaceSerializer() {
-               return metaInfo.getNamespaceSerializer();
-       }
-
-       @Override
-       public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() {
-               return metaInfo;
-       }
-
-       @Override
-       public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> 
metaInfo) {
-               this.metaInfo = metaInfo;
-       }
-
-       // Iteration  
------------------------------------------------------------------------------------------------------
-
-       @Nonnull
        @Override
-       public Iterator<StateEntry<K, N, S>> iterator() {
-               return new StateEntryIterator();
-       }
-
-       // Private utility functions for StateTable management 
-------------------------------------------------------------
-
-       /**
-        * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot)
-        */
-       @VisibleForTesting
-       void releaseSnapshot(int snapshotVersion) {
-               // we guard against concurrent modifications of 
highestRequiredSnapshotVersion between snapshot and release.
-               // Only stale reads of from the result of #releaseSnapshot 
calls are ok.
-               synchronized (snapshotVersions) {
-                       
Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to 
release unknown snapshot version");
-                       highestRequiredSnapshotVersion = 
snapshotVersions.isEmpty() ? 0 : snapshotVersions.last();
-               }
-       }
-
-       /**
-        * Creates (combined) copy of the table arrays for a snapshot. This 
method must be called by the same Thread that
-        * does modifications to the {@link CopyOnWriteStateTable}.
-        */
-       @VisibleForTesting
-       @SuppressWarnings("unchecked")
-       StateTableEntry<K, N, S>[] snapshotTableArrays() {
-
-               // we guard against concurrent modifications of 
highestRequiredSnapshotVersion between snapshot and release.
-               // Only stale reads of from the result of #releaseSnapshot 
calls are ok. This is why we must call this method
-               // from the same thread that does all the modifications to the 
table.
-               synchronized (snapshotVersions) {
-
-                       // increase the table version for copy-on-write and 
register the snapshot
-                       if (++stateTableVersion < 0) {
-                               // this is just a safety net against overflows, 
but should never happen in practice (i.e., only after 2^31 snapshots)
-                               throw new IllegalStateException("Version count 
overflow in CopyOnWriteStateTable. Enforcing restart.");
-                       }
-
-                       highestRequiredSnapshotVersion = stateTableVersion;
-                       snapshotVersions.add(highestRequiredSnapshotVersion);
-               }
-
-               StateTableEntry<K, N, S>[] table = primaryTable;
-
-               // In order to reuse the copied array as the destination array 
for the partitioned records in
-               // CopyOnWriteStateTableSnapshot#partitionByKeyGroup(), we need 
to make sure that the copied array
-               // is big enough to hold the flattened entries. In fact, given 
the current rehashing algorithm, we only
-               // need to do this check when isRehashing() is false, but in 
order to get a more robust code(in case that
-               // the rehashing algorithm may changed in the future), we do 
this check for all the case.
-               final int totalTableIndexSize = rehashIndex + table.length;
-               final int copiedArraySize = Math.max(totalTableIndexSize, 
size());
-               final StateTableEntry<K, N, S>[] copy = new 
StateTableEntry[copiedArraySize];
-
-               if (isRehashing()) {
-                       // consider both tables for the snapshot, the rehash 
index tells us which part of the two tables we need
-                       final int localRehashIndex = rehashIndex;
-                       final int localCopyLength = table.length - 
localRehashIndex;
-                       // for the primary table, take every index >= rhIdx.
-                       System.arraycopy(table, localRehashIndex, copy, 0, 
localCopyLength);
-
-                       // for the new table, we are sure that two regions 
contain all the entries:
-                       // [0, rhIdx[ AND [table.length / 2, table.length / 2 + 
rhIdx[
-                       table = incrementalRehashTable;
-                       System.arraycopy(table, 0, copy, localCopyLength, 
localRehashIndex);
-                       System.arraycopy(table, table.length >>> 1, copy, 
localCopyLength + localRehashIndex, localRehashIndex);
-               } else {
-                       // we only need to copy the primary table
-                       System.arraycopy(table, 0, copy, 0, table.length);
-               }
-
-               return copy;
-       }
-
-       /**
-        * Allocate a table of the given capacity and set the threshold 
accordingly.
-        *
-        * @param newCapacity must be a power of two
-        */
-       private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
-
-               if (newCapacity < MAXIMUM_CAPACITY) {
-                       threshold = (newCapacity >> 1) + (newCapacity >> 2); // 
3/4 capacity
-               } else {
-                       if (size() > MAX_ARRAY_SIZE) {
-
-                               throw new IllegalStateException("Maximum 
capacity of CopyOnWriteStateTable is reached and the job " +
-                                       "cannot continue. Please consider 
scaling-out your job or using a different keyed state backend " +
-                                       "implementation!");
-                       } else {
-
-                               LOG.warn("Maximum capacity of 2^30 in 
StateTable reached. Cannot increase hash table size. This can " +
-                                       "lead to more collisions and lower 
performance. Please consider scaling-out your job or using a " +
-                                       "different keyed state backend 
implementation!");
-                               threshold = MAX_ARRAY_SIZE;
-                       }
-               }
-
-               @SuppressWarnings("unchecked") StateTableEntry<K, N, S>[] 
newTable
-                               = (StateTableEntry<K, N, S>[]) new 
StateTableEntry[newCapacity];
-               return newTable;
-       }
-
-       /**
-        * Creates and inserts a new {@link StateTableEntry}.
-        */
-       private StateTableEntry<K, N, S> addNewStateTableEntry(
-                       StateTableEntry<K, N, S>[] table,
-                       K key,
-                       N namespace,
-                       int hash) {
-
-               // small optimization that aims to avoid holding references on 
duplicate namespace objects
-               if (namespace.equals(lastNamespace)) {
-                       namespace = lastNamespace;
-               } else {
-                       lastNamespace = namespace;
-               }
-
-               int index = hash & (table.length - 1);
-               StateTableEntry<K, N, S> newEntry = new StateTableEntry<>(
-                               key,
-                               namespace,
-                               null,
-                               hash,
-                               table[index],
-                               stateTableVersion,
-                               stateTableVersion);
-               table[index] = newEntry;
-
-               if (table == primaryTable) {
-                       ++primaryTableSize;
-               } else {
-                       ++incrementalRehashTableSize;
-               }
-               return newEntry;
-       }
-
-       /**
-        * Select the sub-table which is responsible for entries with the given 
hash code.
-        *
-        * @param hashCode the hash code which we use to decide about the table 
that is responsible.
-        * @return the index of the sub-table that is responsible for the entry 
with the given hash code.
-        */
-       private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) {
-               return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? 
primaryTable : incrementalRehashTable;
-       }
-
-       /**
-        * Doubles the capacity of the hash table. Existing entries are placed 
in
-        * the correct bucket on the enlarged table. If the current capacity is,
-        * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which
-        * will be new unless we were already at MAXIMUM_CAPACITY.
-        */
-       private void doubleCapacity() {
-
-               // There can only be one rehash in flight. From the amount of 
incremental rehash steps we take, this should always hold.
-               Preconditions.checkState(!isRehashing(), "There is already a 
rehash in progress.");
-
-               StateTableEntry<K, N, S>[] oldTable = primaryTable;
-
-               int oldCapacity = oldTable.length;
-
-               if (oldCapacity == MAXIMUM_CAPACITY) {
-                       return;
-               }
-
-               incrementalRehashTable = makeTable(oldCapacity * 2);
-       }
-
-       /**
-        * Returns true, if an incremental rehash is in progress.
-        */
-       @VisibleForTesting
-       boolean isRehashing() {
-               // if we rehash, the secondary table is not empty
-               return EMPTY_TABLE != incrementalRehashTable;
-       }
-
-       /**
-        * Computes the hash for the composite of key and namespace and 
performs some steps of incremental rehash if
-        * incremental rehashing is in progress.
-        */
-       private int computeHashForOperationAndDoIncrementalRehash(K key, N 
namespace) {
-
-               checkKeyNamespacePreconditions(key, namespace);
-
-               if (isRehashing()) {
-                       incrementalRehash();
-               }
-
-               return compositeHash(key, namespace);
-       }
-
-       /**
-        * Runs a number of steps for incremental rehashing.
-        */
-       @SuppressWarnings("unchecked")
-       private void incrementalRehash() {
-
-               StateTableEntry<K, N, S>[] oldTable = primaryTable;
-               StateTableEntry<K, N, S>[] newTable = incrementalRehashTable;
-
-               int oldCapacity = oldTable.length;
-               int newMask = newTable.length - 1;
-               int requiredVersion = highestRequiredSnapshotVersion;
-               int rhIdx = rehashIndex;
-               int transferred = 0;
-
-               // we migrate a certain minimum amount of entries from the old 
to the new table
-               while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
-
-                       StateTableEntry<K, N, S> e = oldTable[rhIdx];
-
-                       while (e != null) {
-                               // copy-on-write check for entry
-                               if (e.entryVersion < requiredVersion) {
-                                       e = new StateTableEntry<>(e, 
stateTableVersion);
-                               }
-                               StateTableEntry<K, N, S> n = e.next;
-                               int pos = e.hash & newMask;
-                               e.next = newTable[pos];
-                               newTable[pos] = e;
-                               e = n;
-                               ++transferred;
-                       }
-
-                       oldTable[rhIdx] = null;
-                       if (++rhIdx == oldCapacity) {
-                               //here, the rehash is complete and we release 
resources and reset fields
-                               primaryTable = newTable;
-                               incrementalRehashTable = (StateTableEntry<K, N, 
S>[]) EMPTY_TABLE;
-                               primaryTableSize += incrementalRehashTableSize;
-                               incrementalRehashTableSize = 0;
-                               rehashIndex = 0;
-                               return;
-                       }
-               }
-
-               // sync our local bookkeeping the with official bookkeeping 
fields
-               primaryTableSize -= transferred;
-               incrementalRehashTableSize += transferred;
-               rehashIndex = rhIdx;
-       }
-
-       /**
-        * Perform copy-on-write for entry chains. We iterate the (hopefully 
and probably) still cached chain, replace
-        * all links up to the 'untilEntry', which we actually wanted to modify.
-        */
-       private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(
-                       StateTableEntry<K, N, S>[] tab,
-                       int tableIdx,
-                       StateTableEntry<K, N, S> untilEntry) {
-
-               final int required = highestRequiredSnapshotVersion;
-
-               StateTableEntry<K, N, S> current = tab[tableIdx];
-               StateTableEntry<K, N, S> copy;
-
-               if (current.entryVersion < required) {
-                       copy = new StateTableEntry<>(current, 
stateTableVersion);
-                       tab[tableIdx] = copy;
-               } else {
-                       // nothing to do, just advance copy to current
-                       copy = current;
-               }
-
-               // we iterate the chain up to 'until entry'
-               while (current != untilEntry) {
-
-                       //advance current
-                       current = current.next;
-
-                       if (current.entryVersion < required) {
-                               // copy and advance the current's copy
-                               copy.next = new StateTableEntry<>(current, 
stateTableVersion);
-                               copy = copy.next;
-                       } else {
-                               // nothing to do, just advance copy to current
-                               copy = current;
-                       }
-               }
-
-               return copy;
-       }
-
-       @SuppressWarnings("unchecked")
-       private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
-               return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY;
-       }
-
-       /**
-        * Helper function that creates and scrambles a composite hash for key 
and namespace.
-        */
-       private static int compositeHash(Object key, Object namespace) {
-               // create composite key through XOR, then apply some bit-mixing 
for better distribution of skewed keys.
-               return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode());
+       protected CopyOnWriteStateMap<K, N, S> createStateMap() {
+               return new CopyOnWriteStateMap<>(getStateSerializer());
        }
 
        // Snapshotting 
----------------------------------------------------------------------------------------------------
 
-       int getStateTableVersion() {
-               return stateTableVersion;
-       }
-
        /**
-        * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be 
written in checkpointing. The snapshot integrity
-        * is protected through copy-on-write from the {@link 
CopyOnWriteStateTable}. Users should call
-        * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using 
the returned object.
+        * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be 
written in checkpointing.
         *
         * @return a snapshot from this {@link CopyOnWriteStateTable}, for 
checkpointing.
         */
        @Nonnull
        @Override
        public CopyOnWriteStateTableSnapshot<K, N, S> stateSnapshot() {
-               return new CopyOnWriteStateTableSnapshot<>(this);
+               return new CopyOnWriteStateTableSnapshot<>(
+                       this,
+                       getKeySerializer().duplicate(),
+                       getNamespaceSerializer().duplicate(),
+                       getStateSerializer().duplicate(),
+                       
getMetaInfo().getStateSnapshotTransformFactory().createForDeserializedState().orElse(null));
        }
 
-       /**
-        * Releases a snapshot for this {@link CopyOnWriteStateTable}. This 
method should be called once a snapshot is no more needed,
-        * so that the {@link CopyOnWriteStateTable} can stop considering this 
snapshot for copy-on-write, thus avoiding unnecessary
-        * object creation.
-        *
-        * @param snapshotToRelease the snapshot to release, which was 
previously created by this state table.
-        */
-       void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> 
snapshotToRelease) {
-
-               Preconditions.checkArgument(snapshotToRelease.isOwner(this),
-                               "Cannot release snapshot which is owned by a 
different state table.");
-
-               releaseSnapshot(snapshotToRelease.getSnapshotVersion());
-       }
-
-       // StateTableEntry 
-------------------------------------------------------------------------------------------------
-
-       /**
-        * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of 
key, namespace, and state. Thereby, key and
-        * namespace together serve as a composite key for the state. This 
class also contains some management meta data for
-        * copy-on-write, a pointer to link other {@link StateTableEntry}s to a 
list, and cached hash code.
-        *
-        * @param <K> type of key.
-        * @param <N> type of namespace.
-        * @param <S> type of state.
-        */
-       @VisibleForTesting
-       protected static class StateTableEntry<K, N, S> implements 
StateEntry<K, N, S> {
-
-               /**
-                * The key. Assumed to be immutable and not null.
-                */
-               @Nonnull
-               final K key;
-
-               /**
-                * The namespace. Assumed to be immutable and not null.
-                */
-               @Nonnull
-               final N namespace;
-
-               /**
-                * The state. This is not final to allow exchanging the object 
for copy-on-write. Can be null.
-                */
-               @Nullable
-               S state;
-
-               /**
-                * Link to another {@link StateTableEntry}. This is used to 
resolve collisions in the
-                * {@link CopyOnWriteStateTable} through chaining.
-                */
-               @Nullable
-               StateTableEntry<K, N, S> next;
-
-               /**
-                * The version of this {@link StateTableEntry}. This is meta 
data for copy-on-write of the table structure.
-                */
-               int entryVersion;
-
-               /**
-                * The version of the state object in this entry. This is meta 
data for copy-on-write of the state object itself.
-                */
-               int stateVersion;
-
-               /**
-                * The computed secondary hash for the composite of key and 
namespace.
-                */
-               final int hash;
-
-               StateTableEntry(StateTableEntry<K, N, S> other, int 
entryVersion) {
-                       this(other.key, other.namespace, other.state, 
other.hash, other.next, entryVersion, other.stateVersion);
-               }
-
-               StateTableEntry(
-                       @Nonnull K key,
-                       @Nonnull N namespace,
-                       @Nullable S state,
-                       int hash,
-                       @Nullable StateTableEntry<K, N, S> next,
-                       int entryVersion,
-                       int stateVersion) {
-                       this.key = key;
-                       this.namespace = namespace;
-                       this.hash = hash;
-                       this.next = next;
-                       this.entryVersion = entryVersion;
-                       this.state = state;
-                       this.stateVersion = stateVersion;
-               }
-
-               public final void setState(@Nullable S value, int mapVersion) {
-                       // naturally, we can update the state version every 
time we replace the old state with a different object
-                       if (value != state) {
-                               this.state = value;
-                               this.stateVersion = mapVersion;
+       CopyOnWriteStateMapSnapshot<K, N, S>[] getStateMapSnapshotArray() {
+               CopyOnWriteStateMapSnapshot<K, N, S>[] snapshotArray =
+                       new CopyOnWriteStateMapSnapshot[state.length];
 
 Review comment:
   I would suggest to suppress the warning for unchecked assignment here or, as 
this place is not really performance critical, rather just use array list 
instead of array.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to