Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5885#discussion_r184675164
  
    --- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -1116,148 +1115,177 @@ private void 
restoreKeyGroupsShardWithTemporaryHelperInstance(
        // 
------------------------------------------------------------------------
     
        /**
    -    * Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
    -    * we don't restore the individual k/v states, just the global RocksDB 
database and the
    -    * list of column families. When a k/v state is first requested we 
check here whether we
    -    * already have a column family for that and return it or create a new 
one if it doesn't exist.
    +    * Registers a k/v state information, which includes its state id, 
type, RocksDB column family handle, and serializers.
         *
    -    * <p>This also checks whether the {@link StateDescriptor} for a state 
matches the one
    -    * that we checkpointed, i.e. is already in the map of column families.
    +    * When restoring from a snapshot, we don’t restore the individual 
k/v states, just the global RocksDB database and
    +    * the list of k/v state information. When a k/v state is first 
requested we check here whether we
    +    * already have a registered entry for that and return it (after some 
necessary state compatibility checks)
    +    * or create a new one if it does not exist.
         */
    -   @SuppressWarnings("rawtypes, unchecked")
    -   protected <N, S> ColumnFamilyHandle getColumnFamily(
    -           StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
    +   private Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> tryRegisterKvStateInformation(
    +                   StateDescriptor<?, ?> stateDesc,
    +                   TypeSerializer<?> namespaceSerializer) throws 
StateMigrationException, IOException {
     
                Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
    -                   kvStateInformation.get(descriptor.getName());
    -
    -           RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
    -                   descriptor.getType(),
    -                   descriptor.getName(),
    -                   namespaceSerializer,
    -                   descriptor.getSerializer());
    +                   kvStateInformation.get(stateDesc.getName());
     
    +           RegisteredKeyedBackendStateMetaInfo<?, ?> newMetaInfo;
                if (stateInfo != null) {
    -                   // TODO with eager registration in place, these checks 
should be moved to restore()
     
    -                   RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
    -                           
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(descriptor.getName());
    +                   @SuppressWarnings("unchecked")
    +                   RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfoSnapshot =
    +                           
restoredKvStateMetaInfos.get(stateDesc.getName());
     
                        Preconditions.checkState(
    -                           Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
    -                           "Incompatible state names. " +
    -                                   "Was [" + restoredMetaInfo.getName() + 
"], " +
    -                                   "registered with [" + 
newMetaInfo.getName() + "].");
    -
    -                   if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
    -                           && 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
    -
    -                           Preconditions.checkState(
    -                                   newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
    -                                   "Incompatible state types. " +
    -                                           "Was [" + 
restoredMetaInfo.getStateType() + "], " +
    -                                           "registered with [" + 
newMetaInfo.getStateType() + "].");
    -                   }
    +                           restoredMetaInfoSnapshot != null,
    +                           "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
    +                                   " but its corresponding restored 
snapshot cannot be found.");
     
    -                   // check compatibility results to determine if state 
migration is required
    -                   CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    -                           restoredMetaInfo.getNamespaceSerializer(),
    -                           null,
    -                           
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
    -                           newMetaInfo.getNamespaceSerializer());
    +                   newMetaInfo = 
RegisteredKeyedBackendStateMetaInfo.resolveKvStateCompatibility(
    +                           restoredMetaInfoSnapshot,
    +                           namespaceSerializer,
    +                           stateDesc);
     
    -                   CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
    -                           restoredMetaInfo.getStateSerializer(),
    -                           UnloadableDummyTypeSerializer.class,
    -                           
restoredMetaInfo.getStateSerializerConfigSnapshot(),
    -                           newMetaInfo.getStateSerializer());
    +                   stateInfo.f1 = newMetaInfo;
    +           } else {
    +                   String stateName = stateDesc.getName();
     
    -                   if (namespaceCompatibility.isRequiresMigration() || 
stateCompatibility.isRequiresMigration()) {
    -                           // TODO state migration currently isn't 
possible.
    -                           throw new StateMigrationException("State 
migration isn't supported, yet.");
    -                   } else {
    -                           stateInfo.f1 = newMetaInfo;
    -                           return stateInfo.f0;
    -                   }
    +                   newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
    +                           stateDesc.getType(),
    +                           stateName,
    +                           namespaceSerializer,
    +                           stateDesc.getSerializer());
    +
    +                   ColumnFamilyHandle columnFamily = 
createColumnFamily(stateName);
    +
    +                   stateInfo = Tuple2.of(columnFamily, newMetaInfo);
    +                   kvStateInformation.put(stateDesc.getName(), stateInfo);
                }
     
    -           byte[] nameBytes = 
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
    +           return stateInfo;
    +   }
    +
    +   /**
    +    * Creates a column family handle for use with a k/v state.
    +    */
    +   private ColumnFamilyHandle createColumnFamily(String stateName) throws 
IOException, StateMigrationException {
    --- End diff --
    
    This method now longer throws `StateMigrationException`, so it could be 
removed.


---

Reply via email to