StefanRRichter commented on a change in pull request #8611: [FLINK-12693][state] Store state per key-group in CopyOnWriteStateTable URL: https://github.com/apache/flink/pull/8611#discussion_r293555221
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ########## @@ -219,985 +43,41 @@ InternalKeyContext<K> keyContext, RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo, TypeSerializer<K> keySerializer) { - this(keyContext, metaInfo, DEFAULT_CAPACITY, keySerializer); - } - - /** - * Constructs a new {@code StateTable} instance with the specified capacity. - * - * @param keyContext the key context. - * @param metaInfo the meta information, including the type serializer for state copy-on-write. - * @param capacity the initial capacity of this hash map. - * @param keySerializer the serializer of the key. - * @throws IllegalArgumentException when the capacity is less than zero. - */ - @SuppressWarnings("unchecked") - private CopyOnWriteStateTable( - InternalKeyContext<K> keyContext, - RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo, - int capacity, - TypeSerializer<K> keySerializer) { super(keyContext, metaInfo, keySerializer); - - // initialized tables to EMPTY_TABLE. - this.primaryTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE; - this.incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE; - - // initialize sizes to 0. - this.primaryTableSize = 0; - this.incrementalRehashTableSize = 0; - - this.rehashIndex = 0; - this.stateTableVersion = 0; - this.highestRequiredSnapshotVersion = 0; - this.snapshotVersions = new TreeSet<>(); - - if (capacity < 0) { - throw new IllegalArgumentException("Capacity: " + capacity); - } - - if (capacity == 0) { - threshold = -1; - return; - } - - if (capacity < MINIMUM_CAPACITY) { - capacity = MINIMUM_CAPACITY; - } else if (capacity > MAXIMUM_CAPACITY) { - capacity = MAXIMUM_CAPACITY; - } else { - capacity = MathUtils.roundUpToPowerOfTwo(capacity); - } - primaryTable = makeTable(capacity); } - // Public API from AbstractStateTable ------------------------------------------------------------------------------ - - /** - * Returns the total number of entries in this {@link CopyOnWriteStateTable}. This is the sum of both sub-tables. - * - * @return the number of entries in this {@link CopyOnWriteStateTable}. - */ - @Override - public int size() { - return primaryTableSize + incrementalRehashTableSize; - } - - @Override - public S get(K key, N namespace) { - - final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); - final int requiredVersion = highestRequiredSnapshotVersion; - final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash); - int index = hash & (tab.length - 1); - - for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) { - final K eKey = e.key; - final N eNamespace = e.namespace; - if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) { - - // copy-on-write check for state - if (e.stateVersion < requiredVersion) { - // copy-on-write check for entry - if (e.entryVersion < requiredVersion) { - e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e); - } - e.stateVersion = stateTableVersion; - e.state = getStateSerializer().copy(e.state); - } - - return e.state; - } - } - - return null; - } - - @Override - public Stream<K> getKeys(N namespace) { - return StreamSupport.stream(spliterator(), false) - .filter(entry -> entry.getNamespace().equals(namespace)) - .map(StateEntry::getKey); - } - - @Override - public void put(K key, int keyGroup, N namespace, S state) { - put(key, namespace, state); - } - - @Override - public S get(N namespace) { - return get(keyContext.getCurrentKey(), namespace); - } - - @Override - public boolean containsKey(N namespace) { - return containsKey(keyContext.getCurrentKey(), namespace); - } - - @Override - public void put(N namespace, S state) { - put(keyContext.getCurrentKey(), namespace, state); - } - - @Override - public S putAndGetOld(N namespace, S state) { - return putAndGetOld(keyContext.getCurrentKey(), namespace, state); - } - - @Override - public void remove(N namespace) { - remove(keyContext.getCurrentKey(), namespace); - } - - @Override - public S removeAndGetOld(N namespace) { - return removeAndGetOld(keyContext.getCurrentKey(), namespace); - } - - @Override - public <T> void transform(N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception { - transform(keyContext.getCurrentKey(), namespace, value, transformation); - } - - // Private implementation details of the API methods --------------------------------------------------------------- - - /** - * Returns whether this table contains the specified key/namespace composite key. - * - * @param key the key in the composite key to search for. Not null. - * @param namespace the namespace in the composite key to search for. Not null. - * @return {@code true} if this map contains the specified key/namespace composite key, - * {@code false} otherwise. - */ - boolean containsKey(K key, N namespace) { - - final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); - final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash); - int index = hash & (tab.length - 1); - - for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) { - final K eKey = e.key; - final N eNamespace = e.namespace; - - if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) { - return true; - } - } - return false; - } - - /** - * Maps the specified key/namespace composite key to the specified value. This method should be preferred - * over {@link #putAndGetOld(Object, Object, Object)} (Object, Object)} when the caller is not interested - * in the old value, because this can potentially reduce copy-on-write activity. - * - * @param key the key. Not null. - * @param namespace the namespace. Not null. - * @param value the value. Can be null. - */ - void put(K key, N namespace, S value) { - final StateTableEntry<K, N, S> e = putEntry(key, namespace); - - e.state = value; - e.stateVersion = stateTableVersion; - } - - /** - * Maps the specified key/namespace composite key to the specified value. Returns the previous state that was - * registered under the composite key. - * - * @param key the key. Not null. - * @param namespace the namespace. Not null. - * @param value the value. Can be null. - * @return the value of any previous mapping with the specified key or - * {@code null} if there was no such mapping. - */ - S putAndGetOld(K key, N namespace, S value) { - - final StateTableEntry<K, N, S> e = putEntry(key, namespace); - - // copy-on-write check for state - S oldState = (e.stateVersion < highestRequiredSnapshotVersion) ? - getStateSerializer().copy(e.state) : - e.state; - - e.state = value; - e.stateVersion = stateTableVersion; - - return oldState; - } - - /** - * Removes the mapping with the specified key/namespace composite key from this map. This method should be preferred - * over {@link #removeAndGetOld(Object, Object)} when the caller is not interested in the old value, because this - * can potentially reduce copy-on-write activity. - * - * @param key the key of the mapping to remove. Not null. - * @param namespace the namespace of the mapping to remove. Not null. - */ - void remove(K key, N namespace) { - removeEntry(key, namespace); - } - - /** - * Removes the mapping with the specified key/namespace composite key from this map, returning the state that was - * found under the entry. - * - * @param key the key of the mapping to remove. Not null. - * @param namespace the namespace of the mapping to remove. Not null. - * @return the value of the removed mapping or {@code null} if no mapping - * for the specified key was found. - */ - S removeAndGetOld(K key, N namespace) { - - final StateTableEntry<K, N, S> e = removeEntry(key, namespace); - - return e != null ? - // copy-on-write check for state - (e.stateVersion < highestRequiredSnapshotVersion ? - getStateSerializer().copy(e.state) : - e.state) : - null; - } - - /** - * @param key the key of the mapping to remove. Not null. - * @param namespace the namespace of the mapping to remove. Not null. - * @param value the value that is the second input for the transformation. - * @param transformation the transformation function to apply on the old state and the given value. - * @param <T> type of the value that is the second input to the {@link StateTransformationFunction}. - * @throws Exception exception that happen on applying the function. - * @see #transform(Object, Object, StateTransformationFunction). - */ - <T> void transform( - K key, - N namespace, - T value, - StateTransformationFunction<S, T> transformation) throws Exception { - - final StateTableEntry<K, N, S> entry = putEntry(key, namespace); - - // copy-on-write check for state - entry.state = transformation.apply( - (entry.stateVersion < highestRequiredSnapshotVersion) ? - getStateSerializer().copy(entry.state) : - entry.state, - value); - entry.stateVersion = stateTableVersion; - } - - /** - * Helper method that is the basis for operations that add mappings. - */ - private StateTableEntry<K, N, S> putEntry(K key, N namespace) { - - final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); - final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash); - int index = hash & (tab.length - 1); - - for (StateTableEntry<K, N, S> e = tab[index]; e != null; e = e.next) { - if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) { - - // copy-on-write check for entry - if (e.entryVersion < highestRequiredSnapshotVersion) { - e = handleChainedEntryCopyOnWrite(tab, index, e); - } - - return e; - } - } - - ++modCount; - if (size() > threshold) { - doubleCapacity(); - } - - return addNewStateTableEntry(tab, key, namespace, hash); - } - - /** - * Helper method that is the basis for operations that remove mappings. - */ - private StateTableEntry<K, N, S> removeEntry(K key, N namespace) { - - final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace); - final StateTableEntry<K, N, S>[] tab = selectActiveTable(hash); - int index = hash & (tab.length - 1); - - for (StateTableEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) { - if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) { - if (prev == null) { - tab[index] = e.next; - } else { - // copy-on-write check for entry - if (prev.entryVersion < highestRequiredSnapshotVersion) { - prev = handleChainedEntryCopyOnWrite(tab, index, prev); - } - prev.next = e.next; - } - ++modCount; - if (tab == primaryTable) { - --primaryTableSize; - } else { - --incrementalRehashTableSize; - } - return e; - } - } - return null; - } - - private void checkKeyNamespacePreconditions(K key, N namespace) { - Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); - Preconditions.checkNotNull(namespace, "Provided namespace is null."); - } - - // Meta data setter / getter and toString -------------------------------------------------------------------------- - - @Override - public TypeSerializer<S> getStateSerializer() { - return metaInfo.getStateSerializer(); - } - - @Override - public TypeSerializer<N> getNamespaceSerializer() { - return metaInfo.getNamespaceSerializer(); - } - - @Override - public RegisteredKeyValueStateBackendMetaInfo<N, S> getMetaInfo() { - return metaInfo; - } - - @Override - public void setMetaInfo(RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo) { - this.metaInfo = metaInfo; - } - - // Iteration ------------------------------------------------------------------------------------------------------ - - @Nonnull @Override - public Iterator<StateEntry<K, N, S>> iterator() { - return new StateEntryIterator(); - } - - // Private utility functions for StateTable management ------------------------------------------------------------- - - /** - * @see #releaseSnapshot(CopyOnWriteStateTableSnapshot) - */ - @VisibleForTesting - void releaseSnapshot(int snapshotVersion) { - // we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release. - // Only stale reads of from the result of #releaseSnapshot calls are ok. - synchronized (snapshotVersions) { - Preconditions.checkState(snapshotVersions.remove(snapshotVersion), "Attempt to release unknown snapshot version"); - highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ? 0 : snapshotVersions.last(); - } - } - - /** - * Creates (combined) copy of the table arrays for a snapshot. This method must be called by the same Thread that - * does modifications to the {@link CopyOnWriteStateTable}. - */ - @VisibleForTesting - @SuppressWarnings("unchecked") - StateTableEntry<K, N, S>[] snapshotTableArrays() { - - // we guard against concurrent modifications of highestRequiredSnapshotVersion between snapshot and release. - // Only stale reads of from the result of #releaseSnapshot calls are ok. This is why we must call this method - // from the same thread that does all the modifications to the table. - synchronized (snapshotVersions) { - - // increase the table version for copy-on-write and register the snapshot - if (++stateTableVersion < 0) { - // this is just a safety net against overflows, but should never happen in practice (i.e., only after 2^31 snapshots) - throw new IllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart."); - } - - highestRequiredSnapshotVersion = stateTableVersion; - snapshotVersions.add(highestRequiredSnapshotVersion); - } - - StateTableEntry<K, N, S>[] table = primaryTable; - - // In order to reuse the copied array as the destination array for the partitioned records in - // CopyOnWriteStateTableSnapshot#partitionByKeyGroup(), we need to make sure that the copied array - // is big enough to hold the flattened entries. In fact, given the current rehashing algorithm, we only - // need to do this check when isRehashing() is false, but in order to get a more robust code(in case that - // the rehashing algorithm may changed in the future), we do this check for all the case. - final int totalTableIndexSize = rehashIndex + table.length; - final int copiedArraySize = Math.max(totalTableIndexSize, size()); - final StateTableEntry<K, N, S>[] copy = new StateTableEntry[copiedArraySize]; - - if (isRehashing()) { - // consider both tables for the snapshot, the rehash index tells us which part of the two tables we need - final int localRehashIndex = rehashIndex; - final int localCopyLength = table.length - localRehashIndex; - // for the primary table, take every index >= rhIdx. - System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength); - - // for the new table, we are sure that two regions contain all the entries: - // [0, rhIdx[ AND [table.length / 2, table.length / 2 + rhIdx[ - table = incrementalRehashTable; - System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex); - System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex); - } else { - // we only need to copy the primary table - System.arraycopy(table, 0, copy, 0, table.length); - } - - return copy; - } - - /** - * Allocate a table of the given capacity and set the threshold accordingly. - * - * @param newCapacity must be a power of two - */ - private StateTableEntry<K, N, S>[] makeTable(int newCapacity) { - - if (newCapacity < MAXIMUM_CAPACITY) { - threshold = (newCapacity >> 1) + (newCapacity >> 2); // 3/4 capacity - } else { - if (size() > MAX_ARRAY_SIZE) { - - throw new IllegalStateException("Maximum capacity of CopyOnWriteStateTable is reached and the job " + - "cannot continue. Please consider scaling-out your job or using a different keyed state backend " + - "implementation!"); - } else { - - LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can " + - "lead to more collisions and lower performance. Please consider scaling-out your job or using a " + - "different keyed state backend implementation!"); - threshold = MAX_ARRAY_SIZE; - } - } - - @SuppressWarnings("unchecked") StateTableEntry<K, N, S>[] newTable - = (StateTableEntry<K, N, S>[]) new StateTableEntry[newCapacity]; - return newTable; - } - - /** - * Creates and inserts a new {@link StateTableEntry}. - */ - private StateTableEntry<K, N, S> addNewStateTableEntry( - StateTableEntry<K, N, S>[] table, - K key, - N namespace, - int hash) { - - // small optimization that aims to avoid holding references on duplicate namespace objects - if (namespace.equals(lastNamespace)) { - namespace = lastNamespace; - } else { - lastNamespace = namespace; - } - - int index = hash & (table.length - 1); - StateTableEntry<K, N, S> newEntry = new StateTableEntry<>( - key, - namespace, - null, - hash, - table[index], - stateTableVersion, - stateTableVersion); - table[index] = newEntry; - - if (table == primaryTable) { - ++primaryTableSize; - } else { - ++incrementalRehashTableSize; - } - return newEntry; - } - - /** - * Select the sub-table which is responsible for entries with the given hash code. - * - * @param hashCode the hash code which we use to decide about the table that is responsible. - * @return the index of the sub-table that is responsible for the entry with the given hash code. - */ - private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode) { - return (hashCode & (primaryTable.length - 1)) >= rehashIndex ? primaryTable : incrementalRehashTable; - } - - /** - * Doubles the capacity of the hash table. Existing entries are placed in - * the correct bucket on the enlarged table. If the current capacity is, - * MAXIMUM_CAPACITY, this method is a no-op. Returns the table, which - * will be new unless we were already at MAXIMUM_CAPACITY. - */ - private void doubleCapacity() { - - // There can only be one rehash in flight. From the amount of incremental rehash steps we take, this should always hold. - Preconditions.checkState(!isRehashing(), "There is already a rehash in progress."); - - StateTableEntry<K, N, S>[] oldTable = primaryTable; - - int oldCapacity = oldTable.length; - - if (oldCapacity == MAXIMUM_CAPACITY) { - return; - } - - incrementalRehashTable = makeTable(oldCapacity * 2); - } - - /** - * Returns true, if an incremental rehash is in progress. - */ - @VisibleForTesting - boolean isRehashing() { - // if we rehash, the secondary table is not empty - return EMPTY_TABLE != incrementalRehashTable; - } - - /** - * Computes the hash for the composite of key and namespace and performs some steps of incremental rehash if - * incremental rehashing is in progress. - */ - private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) { - - checkKeyNamespacePreconditions(key, namespace); - - if (isRehashing()) { - incrementalRehash(); - } - - return compositeHash(key, namespace); - } - - /** - * Runs a number of steps for incremental rehashing. - */ - @SuppressWarnings("unchecked") - private void incrementalRehash() { - - StateTableEntry<K, N, S>[] oldTable = primaryTable; - StateTableEntry<K, N, S>[] newTable = incrementalRehashTable; - - int oldCapacity = oldTable.length; - int newMask = newTable.length - 1; - int requiredVersion = highestRequiredSnapshotVersion; - int rhIdx = rehashIndex; - int transferred = 0; - - // we migrate a certain minimum amount of entries from the old to the new table - while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) { - - StateTableEntry<K, N, S> e = oldTable[rhIdx]; - - while (e != null) { - // copy-on-write check for entry - if (e.entryVersion < requiredVersion) { - e = new StateTableEntry<>(e, stateTableVersion); - } - StateTableEntry<K, N, S> n = e.next; - int pos = e.hash & newMask; - e.next = newTable[pos]; - newTable[pos] = e; - e = n; - ++transferred; - } - - oldTable[rhIdx] = null; - if (++rhIdx == oldCapacity) { - //here, the rehash is complete and we release resources and reset fields - primaryTable = newTable; - incrementalRehashTable = (StateTableEntry<K, N, S>[]) EMPTY_TABLE; - primaryTableSize += incrementalRehashTableSize; - incrementalRehashTableSize = 0; - rehashIndex = 0; - return; - } - } - - // sync our local bookkeeping the with official bookkeeping fields - primaryTableSize -= transferred; - incrementalRehashTableSize += transferred; - rehashIndex = rhIdx; - } - - /** - * Perform copy-on-write for entry chains. We iterate the (hopefully and probably) still cached chain, replace - * all links up to the 'untilEntry', which we actually wanted to modify. - */ - private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite( - StateTableEntry<K, N, S>[] tab, - int tableIdx, - StateTableEntry<K, N, S> untilEntry) { - - final int required = highestRequiredSnapshotVersion; - - StateTableEntry<K, N, S> current = tab[tableIdx]; - StateTableEntry<K, N, S> copy; - - if (current.entryVersion < required) { - copy = new StateTableEntry<>(current, stateTableVersion); - tab[tableIdx] = copy; - } else { - // nothing to do, just advance copy to current - copy = current; - } - - // we iterate the chain up to 'until entry' - while (current != untilEntry) { - - //advance current - current = current.next; - - if (current.entryVersion < required) { - // copy and advance the current's copy - copy.next = new StateTableEntry<>(current, stateTableVersion); - copy = copy.next; - } else { - // nothing to do, just advance copy to current - copy = current; - } - } - - return copy; - } - - @SuppressWarnings("unchecked") - private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() { - return (StateTableEntry<K, N, S>) ITERATOR_BOOTSTRAP_ENTRY; - } - - /** - * Helper function that creates and scrambles a composite hash for key and namespace. - */ - private static int compositeHash(Object key, Object namespace) { - // create composite key through XOR, then apply some bit-mixing for better distribution of skewed keys. - return MathUtils.bitMix(key.hashCode() ^ namespace.hashCode()); + protected CopyOnWriteStateMap<K, N, S> createStateMap() { + return new CopyOnWriteStateMap<>(getStateSerializer()); } // Snapshotting ---------------------------------------------------------------------------------------------------- - int getStateTableVersion() { - return stateTableVersion; - } - /** - * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be written in checkpointing. The snapshot integrity - * is protected through copy-on-write from the {@link CopyOnWriteStateTable}. Users should call - * {@link #releaseSnapshot(CopyOnWriteStateTableSnapshot)} after using the returned object. + * Creates a snapshot of this {@link CopyOnWriteStateTable}, to be written in checkpointing. * * @return a snapshot from this {@link CopyOnWriteStateTable}, for checkpointing. */ @Nonnull @Override public CopyOnWriteStateTableSnapshot<K, N, S> stateSnapshot() { - return new CopyOnWriteStateTableSnapshot<>(this); + return new CopyOnWriteStateTableSnapshot<>( + this, + getKeySerializer().duplicate(), + getNamespaceSerializer().duplicate(), + getStateSerializer().duplicate(), + getMetaInfo().getStateSnapshotTransformFactory().createForDeserializedState().orElse(null)); } - /** - * Releases a snapshot for this {@link CopyOnWriteStateTable}. This method should be called once a snapshot is no more needed, - * so that the {@link CopyOnWriteStateTable} can stop considering this snapshot for copy-on-write, thus avoiding unnecessary - * object creation. - * - * @param snapshotToRelease the snapshot to release, which was previously created by this state table. - */ - void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> snapshotToRelease) { - - Preconditions.checkArgument(snapshotToRelease.isOwner(this), - "Cannot release snapshot which is owned by a different state table."); - - releaseSnapshot(snapshotToRelease.getSnapshotVersion()); - } - - // StateTableEntry ------------------------------------------------------------------------------------------------- - - /** - * One entry in the {@link CopyOnWriteStateTable}. This is a triplet of key, namespace, and state. Thereby, key and - * namespace together serve as a composite key for the state. This class also contains some management meta data for - * copy-on-write, a pointer to link other {@link StateTableEntry}s to a list, and cached hash code. - * - * @param <K> type of key. - * @param <N> type of namespace. - * @param <S> type of state. - */ - @VisibleForTesting - protected static class StateTableEntry<K, N, S> implements StateEntry<K, N, S> { - - /** - * The key. Assumed to be immutable and not null. - */ - @Nonnull - final K key; - - /** - * The namespace. Assumed to be immutable and not null. - */ - @Nonnull - final N namespace; - - /** - * The state. This is not final to allow exchanging the object for copy-on-write. Can be null. - */ - @Nullable - S state; - - /** - * Link to another {@link StateTableEntry}. This is used to resolve collisions in the - * {@link CopyOnWriteStateTable} through chaining. - */ - @Nullable - StateTableEntry<K, N, S> next; - - /** - * The version of this {@link StateTableEntry}. This is meta data for copy-on-write of the table structure. - */ - int entryVersion; - - /** - * The version of the state object in this entry. This is meta data for copy-on-write of the state object itself. - */ - int stateVersion; - - /** - * The computed secondary hash for the composite of key and namespace. - */ - final int hash; - - StateTableEntry(StateTableEntry<K, N, S> other, int entryVersion) { - this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion); - } - - StateTableEntry( - @Nonnull K key, - @Nonnull N namespace, - @Nullable S state, - int hash, - @Nullable StateTableEntry<K, N, S> next, - int entryVersion, - int stateVersion) { - this.key = key; - this.namespace = namespace; - this.hash = hash; - this.next = next; - this.entryVersion = entryVersion; - this.state = state; - this.stateVersion = stateVersion; - } - - public final void setState(@Nullable S value, int mapVersion) { - // naturally, we can update the state version every time we replace the old state with a different object - if (value != state) { - this.state = value; - this.stateVersion = mapVersion; + CopyOnWriteStateMapSnapshot<K, N, S>[] getStateMapSnapshotArray() { + CopyOnWriteStateMapSnapshot<K, N, S>[] snapshotArray = + new CopyOnWriteStateMapSnapshot[state.length]; Review comment: I would suggest to suppress the warning for unchecked assignment here or, as this place is not really performance critical, rather just use array list instead of array. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services