carp84 commented on a change in pull request #7674: [FLINK-10043] [State 
Backends] Refactor RocksDBKeyedStateBackend object 
construction/initialization/restore code
URL: https://github.com/apache/flink/pull/7674#discussion_r257748314
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##########
 @@ -510,831 +505,92 @@ public WriteOptions getWriteOptions() {
                return snapshotRunner;
        }
 
-       @Override
-       public void restore(Collection<KeyedStateHandle> restoreState) throws 
Exception {
-
-               LOG.info("Initializing RocksDB keyed state backend.");
-
-               // clear all meta data
-               kvStateInformation.clear();
-
-               try {
-                       RocksDBIncrementalRestoreOperation<K> 
incrementalRestoreOperation = null;
-                       if (restoreState == null || restoreState.isEmpty()) {
-                               createDB();
-                       } else {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Restoring snapshot from 
state handles: {}, will use {} thread(s) to download files from DFS.", 
restoreState, numberOfTransferingThreads);
-                               }
-
-                               KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
-                               if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
-                                       || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
-                                       incrementalRestoreOperation = new 
RocksDBIncrementalRestoreOperation<>(this);
-                                       
incrementalRestoreOperation.restore(restoreState);
-                               } else {
-                                       RocksDBFullRestoreOperation<K> 
fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
-                                       
fullRestoreOperation.doRestore(restoreState);
-                               }
-                       }
-
-                       // it is important that we only create the key builder 
after the restore, and not before;
-                       // restore operations may reconfigure the key 
serializer, so accessing the key serializer
-                       // only now we can be certain that the key serializer 
used in the builder is final.
-                       this.sharedRocksKeyBuilder = new 
RocksDBSerializedCompositeKeyBuilder<>(
-                               getKeySerializer(),
+       private RocksDBRestoreOperation<K> getRocksDBRestoreOperation(int 
keyGroupPrefixBytes,
+                                                                               
                                                CloseableRegistry 
cancelStreamRegistry,
+                                                                               
                                                LinkedHashMap<String, 
StateColumnFamilyHandle> kvStateInformation,
+                                                                               
                                                RocksDBNativeMetricMonitor 
nativeMetricMonitor,
+                                                                               
                                                Collection<KeyedStateHandle> 
restoreState) {
+               File instanceRocksDBPath = new File(instanceBasePath, 
DB_INSTANCE_DIR_STRING);
+               if (restoreState == null || restoreState.isEmpty()) {
+                       return new RocksDBNoneRestoreOperation<>(
+                               keyGroupRange,
                                keyGroupPrefixBytes,
-                               32);
-
-                       initializeSnapshotStrategy(incrementalRestoreOperation);
-               } catch (Exception ex) {
-                       dispose();
-                       throw ex;
-               }
-       }
-
-       @VisibleForTesting
-       void initializeSnapshotStrategy(@Nullable 
RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) {
-
-               this.savepointSnapshotStrategy =
-                       new RocksFullSnapshotStrategy<>(
-                               db,
-                               rocksDBResourceGuard,
-                               getKeySerializer(),
+                               numberOfTransferingThreads,
+                               cancelStreamRegistry,
+                               userCodeClassLoader,
                                kvStateInformation,
+                               keySerializerProvider,
+                               instanceBasePath,
+                               instanceRocksDBPath,
+                               dbOptions,
+                               columnOptions,
+                               nativeMetricMonitor);
+               }
+               KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+               boolean isIncrementalStateHandle = (firstStateHandle instanceof 
IncrementalKeyedStateHandle)
+                       || (firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle);
+               if (isIncrementalStateHandle) {
+                       return new RocksDBIncrementalRestoreOperation<>(
+                               operatorIdentifier,
                                keyGroupRange,
                                keyGroupPrefixBytes,
-                               localRecoveryConfig,
+                               numberOfTransferingThreads,
                                cancelStreamRegistry,
-                               keyGroupCompressionDecorator);
-
-               if (enableIncrementalCheckpointing) {
-                       final UUID backendUID;
-                       final SortedMap<Long, Set<StateHandleID>> 
materializedSstFiles;
-                       final long lastCompletedCheckpointId;
-
-                       if (incrementalRestoreOperation == null) {
-                               backendUID = UUID.randomUUID();
-                               materializedSstFiles = new TreeMap<>();
-                               lastCompletedCheckpointId = -1L;
-                       } else {
-                               backendUID = 
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
-                               materializedSstFiles = 
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles());
-                               lastCompletedCheckpointId = 
incrementalRestoreOperation.getLastCompletedCheckpointId();
-                               
Preconditions.checkState(lastCompletedCheckpointId >= 0L);
-                       }
-                       // TODO eventually we might want to separate savepoint 
and snapshot strategy, i.e. having 2 strategies.
-                       this.checkpointSnapshotStrategy = new 
RocksIncrementalSnapshotStrategy<>(
-                               db,
-                               rocksDBResourceGuard,
-                               getKeySerializer(),
+                               userCodeClassLoader,
                                kvStateInformation,
+                               keySerializerProvider,
+                               instanceBasePath,
+                               instanceRocksDBPath,
+                               dbOptions,
+                               columnOptions,
+                               nativeMetricMonitor);
+               } else {
+                       return new RocksDBFullRestoreOperation<>(
                                keyGroupRange,
                                keyGroupPrefixBytes,
-                               localRecoveryConfig,
+                               numberOfTransferingThreads,
                                cancelStreamRegistry,
+                               userCodeClassLoader,
+                               kvStateInformation,
+                               keySerializerProvider,
                                instanceBasePath,
-                               backendUID,
-                               materializedSstFiles,
-                               lastCompletedCheckpointId,
-                               numberOfTransferingThreads);
-               } else {
-                       this.checkpointSnapshotStrategy = 
savepointSnapshotStrategy;
+                               instanceRocksDBPath,
+                               dbOptions,
+                               columnOptions,
+                               nativeMetricMonitor);
                }
        }
 
        @Override
-       public void notifyCheckpointComplete(long completedCheckpointId) throws 
Exception {
-
-               if (checkpointSnapshotStrategy != null) {
-                       
checkpointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
-               }
+       public void restore(Collection<KeyedStateHandle> restoreState) throws 
Exception {
 
-               if (savepointSnapshotStrategy != null) {
-                       
savepointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId);
+               if (restoreState == null) {
+                       return;
                }
-       }
-
-       private void createDB() throws IOException {
-               List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
-               this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
-               this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, 
writeOptions);
-               this.defaultColumnFamily = columnFamilyHandles.get(0);
-       }
-
-       private RocksDB openDB(
-               String path,
-               List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
-               List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
-
-               List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-                       new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
-
-               // we add the required descriptor for the default CF in FIRST 
position, see
-               // 
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
-               columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
-               columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
-
-               RocksDB dbRef;
+               LOG.info("Restoring RocksDB keyed state backend.");
 
                try {
-                       dbRef = RocksDB.open(
-                               Preconditions.checkNotNull(dbOptions),
-                               Preconditions.checkNotNull(path),
-                               columnFamilyDescriptors,
-                               stateColumnFamilyHandles);
-               } catch (RocksDBException e) {
-                       throw new IOException("Error while opening RocksDB 
instance.", e);
-               }
-
-               // requested + default CF
-               Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
-                       "Not all requested column family handles have been 
created");
-
-               if (this.metricOptions.isEnabled()) {
-                       this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
-                               dbRef,
-                               metricOptions,
-                               metricGroup
-                       );
-               }
-
-               return dbRef;
-       }
-
-       /**
-        * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a full snapshot.
-        */
-       private static final class RocksDBFullRestoreOperation<K> {
-
-               private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
-
-               /** Current key-groups state handle from which we restore 
key-groups. */
-               private KeyGroupsStateHandle currentKeyGroupsStateHandle;
-               /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
-               private FSDataInputStream currentStateHandleInStream;
-               /** Current data input view that wraps 
currentStateHandleInStream. */
-               private DataInputView currentStateHandleInView;
-               /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
-               private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
-               /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
-               private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
-
-               private boolean isKeySerializerCompatibilityChecked;
-
-               /**
-                * Creates a restore operation object for the given state 
backend instance.
-                *
-                * @param rocksDBKeyedStateBackend the state backend into which 
we restore
-                */
-               RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
-                       this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
-               }
-
-               /**
-                * Restores all key-groups data that is referenced by the 
passed state handles.
-                *
-                * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
-                */
-               void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
-                       throws IOException, StateMigrationException, 
RocksDBException {
-
-                       rocksDBKeyedStateBackend.createDB();
-
-                       for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
-                               if (keyedStateHandle != null) {
-
-                                       if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
-                                               throw new 
IllegalStateException("Unexpected state handle type, " +
-                                                       "expected: " + 
KeyGroupsStateHandle.class +
-                                                       ", but found: " + 
keyedStateHandle.getClass());
-                                       }
-                                       this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
-                                       restoreKeyGroupsInStateHandle();
-                               }
-                       }
-               }
-
-               /**
-                * Restore one key groups state handle.
-                */
-               private void restoreKeyGroupsInStateHandle()
-                       throws IOException, StateMigrationException, 
RocksDBException {
-                       try {
-                               currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
-                               
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
-                               currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
-                               restoreKVStateMetaData();
-                               restoreKVStateData();
-                       } finally {
-                               if 
(rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
-                                       
IOUtils.closeQuietly(currentStateHandleInStream);
-                               }
-                       }
-               }
-
-               /**
-                * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
-                */
-               private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
-
-                       // isSerializerPresenceRequired flag is set to false, 
since for the RocksDB state backend,
-                       // deserialization of state happens lazily during 
runtime; we depend on the fact
-                       // that the new serializer for states could be 
compatible, and therefore the restore can continue
-                       // without old serializers required to be present.
-                       KeyedBackendSerializationProxy<K> serializationProxy =
-                               new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
-
-                       serializationProxy.read(currentStateHandleInView);
-
-                       if (!isKeySerializerCompatibilityChecked) {
-                               // check for key serializer compatibility; this 
also reconfigures the
-                               // key serializer to be compatible, if it is 
required and is possible
-                               TypeSerializerSchemaCompatibility<K> 
keySerializerSchemaCompat =
-                                       
rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot());
-                               if 
(keySerializerSchemaCompat.isCompatibleAfterMigration() || 
keySerializerSchemaCompat.isIncompatible()) {
-                                       throw new StateMigrationException("The 
new key serializer must be compatible.");
-                               }
-
-                               isKeySerializerCompatibilityChecked = true;
-                       }
-
-                       this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
-                               SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
-
-                       List<StateMetaInfoSnapshot> restoredMetaInfos =
-                               serializationProxy.getStateMetaInfoSnapshots();
-                       currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
-
-                       for (StateMetaInfoSnapshot restoredMetaInfo : 
restoredMetaInfos) {
-
-                               Tuple2<ColumnFamilyHandle, 
RegisteredStateMetaInfoBase> registeredColumn =
-                                       
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
-
-                               if (registeredColumn == null) {
-                                       byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-
-                                       ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
-                                               nameBytes,
-                                               
rocksDBKeyedStateBackend.columnOptions);
-
-                                       ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
-
-                                       // create a meta info for the state on 
restore;
-                                       // this allows us to retain the state 
in future snapshots even if it wasn't accessed
-                                       RegisteredStateMetaInfoBase 
stateMetaInfo =
-                                               
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo);
-                                       registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
-                                       
rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(), 
registeredColumn);
-
-                               } else {
-                                       // TODO with eager state registration 
in place, check here for serializer migration strategies
-                               }
-                               
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
-                       }
-               }
-
-               /**
-                * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
-                */
-               private void restoreKVStateData() throws IOException, 
RocksDBException {
-                       //for all key-groups in the current state handle...
-                       try (RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) {
-                               for (Tuple2<Integer, Long> keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
-                                       int keyGroup = keyGroupOffset.f0;
-
-                                       // Check that restored key groups all 
belong to the backend
-                                       
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
-                                               "The key group must belong to 
the backend");
-
-                                       long offset = keyGroupOffset.f1;
-                                       //not empty key-group?
-                                       if (0L != offset) {
-                                               
currentStateHandleInStream.seek(offset);
-                                               try (InputStream compressedKgIn 
= 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
-                                                       
DataInputViewStreamWrapper compressedKgInputView = new 
DataInputViewStreamWrapper(compressedKgIn);
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       int kvStateId = 
compressedKgInputView.readShort();
-                                                       ColumnFamilyHandle 
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                                       //insert all k/v pairs 
into DB
-                                                       boolean 
keyGroupHasMoreKeys = true;
-                                                       while 
(keyGroupHasMoreKeys) {
-                                                               byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-                                                               byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-                                                               if 
(hasMetaDataFollowsFlag(key)) {
-                                                                       //clear 
the signal bit in the key to make it ready for insertion again
-                                                                       
clearMetaDataFollowsFlag(key);
-                                                                       
writeBatchWrapper.put(handle, key, value);
-                                                                       //TODO 
this could be aware of keyGroupPrefixBytes and write only one byte if possible
-                                                                       
kvStateId = END_OF_KEY_GROUP_MARK
-                                                                               
& compressedKgInputView.readShort();
-                                                                       if 
(END_OF_KEY_GROUP_MARK == kvStateId) {
-                                                                               
keyGroupHasMoreKeys = false;
-                                                                       } else {
-                                                                               
handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                                                       }
-                                                               } else {
-                                                                       
writeBatchWrapper.put(handle, key, value);
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
+                       RocksDBRestoreOperation<K> restoreOperation = 
getRocksDBRestoreOperation(
 
 Review comment:
   For the current codes, yes I agree, while my questions is actually whether 
we need to reserve it for future usage, although I don't know what exactly the 
requirement will be. Or maybe we could add it back whenever the need emerges, 
won't be hard anyway (smile).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to