carp84 commented on a change in pull request #7674: [FLINK-10043] [State Backends] Refactor RocksDBKeyedStateBackend object construction/initialization/restore code URL: https://github.com/apache/flink/pull/7674#discussion_r257748314
########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ########## @@ -510,831 +505,92 @@ public WriteOptions getWriteOptions() { return snapshotRunner; } - @Override - public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { - - LOG.info("Initializing RocksDB keyed state backend."); - - // clear all meta data - kvStateInformation.clear(); - - try { - RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation = null; - if (restoreState == null || restoreState.isEmpty()) { - createDB(); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, numberOfTransferingThreads); - } - - KeyedStateHandle firstStateHandle = restoreState.iterator().next(); - if (firstStateHandle instanceof IncrementalKeyedStateHandle - || firstStateHandle instanceof IncrementalLocalKeyedStateHandle) { - incrementalRestoreOperation = new RocksDBIncrementalRestoreOperation<>(this); - incrementalRestoreOperation.restore(restoreState); - } else { - RocksDBFullRestoreOperation<K> fullRestoreOperation = new RocksDBFullRestoreOperation<>(this); - fullRestoreOperation.doRestore(restoreState); - } - } - - // it is important that we only create the key builder after the restore, and not before; - // restore operations may reconfigure the key serializer, so accessing the key serializer - // only now we can be certain that the key serializer used in the builder is final. - this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>( - getKeySerializer(), + private RocksDBRestoreOperation<K> getRocksDBRestoreOperation(int keyGroupPrefixBytes, + CloseableRegistry cancelStreamRegistry, + LinkedHashMap<String, StateColumnFamilyHandle> kvStateInformation, + RocksDBNativeMetricMonitor nativeMetricMonitor, + Collection<KeyedStateHandle> restoreState) { + File instanceRocksDBPath = new File(instanceBasePath, DB_INSTANCE_DIR_STRING); + if (restoreState == null || restoreState.isEmpty()) { + return new RocksDBNoneRestoreOperation<>( + keyGroupRange, keyGroupPrefixBytes, - 32); - - initializeSnapshotStrategy(incrementalRestoreOperation); - } catch (Exception ex) { - dispose(); - throw ex; - } - } - - @VisibleForTesting - void initializeSnapshotStrategy(@Nullable RocksDBIncrementalRestoreOperation<K> incrementalRestoreOperation) { - - this.savepointSnapshotStrategy = - new RocksFullSnapshotStrategy<>( - db, - rocksDBResourceGuard, - getKeySerializer(), + numberOfTransferingThreads, + cancelStreamRegistry, + userCodeClassLoader, kvStateInformation, + keySerializerProvider, + instanceBasePath, + instanceRocksDBPath, + dbOptions, + columnOptions, + nativeMetricMonitor); + } + KeyedStateHandle firstStateHandle = restoreState.iterator().next(); + boolean isIncrementalStateHandle = (firstStateHandle instanceof IncrementalKeyedStateHandle) + || (firstStateHandle instanceof IncrementalLocalKeyedStateHandle); + if (isIncrementalStateHandle) { + return new RocksDBIncrementalRestoreOperation<>( + operatorIdentifier, keyGroupRange, keyGroupPrefixBytes, - localRecoveryConfig, + numberOfTransferingThreads, cancelStreamRegistry, - keyGroupCompressionDecorator); - - if (enableIncrementalCheckpointing) { - final UUID backendUID; - final SortedMap<Long, Set<StateHandleID>> materializedSstFiles; - final long lastCompletedCheckpointId; - - if (incrementalRestoreOperation == null) { - backendUID = UUID.randomUUID(); - materializedSstFiles = new TreeMap<>(); - lastCompletedCheckpointId = -1L; - } else { - backendUID = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID()); - materializedSstFiles = Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredSstFiles()); - lastCompletedCheckpointId = incrementalRestoreOperation.getLastCompletedCheckpointId(); - Preconditions.checkState(lastCompletedCheckpointId >= 0L); - } - // TODO eventually we might want to separate savepoint and snapshot strategy, i.e. having 2 strategies. - this.checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy<>( - db, - rocksDBResourceGuard, - getKeySerializer(), + userCodeClassLoader, kvStateInformation, + keySerializerProvider, + instanceBasePath, + instanceRocksDBPath, + dbOptions, + columnOptions, + nativeMetricMonitor); + } else { + return new RocksDBFullRestoreOperation<>( keyGroupRange, keyGroupPrefixBytes, - localRecoveryConfig, + numberOfTransferingThreads, cancelStreamRegistry, + userCodeClassLoader, + kvStateInformation, + keySerializerProvider, instanceBasePath, - backendUID, - materializedSstFiles, - lastCompletedCheckpointId, - numberOfTransferingThreads); - } else { - this.checkpointSnapshotStrategy = savepointSnapshotStrategy; + instanceRocksDBPath, + dbOptions, + columnOptions, + nativeMetricMonitor); } } @Override - public void notifyCheckpointComplete(long completedCheckpointId) throws Exception { - - if (checkpointSnapshotStrategy != null) { - checkpointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId); - } + public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { - if (savepointSnapshotStrategy != null) { - savepointSnapshotStrategy.notifyCheckpointComplete(completedCheckpointId); + if (restoreState == null) { + return; } - } - - private void createDB() throws IOException { - List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); - this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); - this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions); - this.defaultColumnFamily = columnFamilyHandles.get(0); - } - - private RocksDB openDB( - String path, - List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, - List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { - - List<ColumnFamilyDescriptor> columnFamilyDescriptors = - new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); - - // we add the required descriptor for the default CF in FIRST position, see - // https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families - columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions)); - columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - - RocksDB dbRef; + LOG.info("Restoring RocksDB keyed state backend."); try { - dbRef = RocksDB.open( - Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), - columnFamilyDescriptors, - stateColumnFamilyHandles); - } catch (RocksDBException e) { - throw new IOException("Error while opening RocksDB instance.", e); - } - - // requested + default CF - Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), - "Not all requested column family handles have been created"); - - if (this.metricOptions.isEnabled()) { - this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( - dbRef, - metricOptions, - metricGroup - ); - } - - return dbRef; - } - - /** - * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot. - */ - private static final class RocksDBFullRestoreOperation<K> { - - private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend; - - /** Current key-groups state handle from which we restore key-groups. */ - private KeyGroupsStateHandle currentKeyGroupsStateHandle; - /** Current input stream we obtained from currentKeyGroupsStateHandle. */ - private FSDataInputStream currentStateHandleInStream; - /** Current data input view that wraps currentStateHandleInStream. */ - private DataInputView currentStateHandleInView; - /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ - private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; - /** The compression decorator that was used for writing the state, as determined by the meta data. */ - private StreamCompressionDecorator keygroupStreamCompressionDecorator; - - private boolean isKeySerializerCompatibilityChecked; - - /** - * Creates a restore operation object for the given state backend instance. - * - * @param rocksDBKeyedStateBackend the state backend into which we restore - */ - RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) { - this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); - } - - /** - * Restores all key-groups data that is referenced by the passed state handles. - * - * @param keyedStateHandles List of all key groups state handles that shall be restored. - */ - void doRestore(Collection<KeyedStateHandle> keyedStateHandles) - throws IOException, StateMigrationException, RocksDBException { - - rocksDBKeyedStateBackend.createDB(); - - for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { - if (keyedStateHandle != null) { - - if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { - throw new IllegalStateException("Unexpected state handle type, " + - "expected: " + KeyGroupsStateHandle.class + - ", but found: " + keyedStateHandle.getClass()); - } - this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; - restoreKeyGroupsInStateHandle(); - } - } - } - - /** - * Restore one key groups state handle. - */ - private void restoreKeyGroupsInStateHandle() - throws IOException, StateMigrationException, RocksDBException { - try { - currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); - rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); - currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); - restoreKVStateMetaData(); - restoreKVStateData(); - } finally { - if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { - IOUtils.closeQuietly(currentStateHandleInStream); - } - } - } - - /** - * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. - */ - private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { - - // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend, - // deserialization of state happens lazily during runtime; we depend on the fact - // that the new serializer for states could be compatible, and therefore the restore can continue - // without old serializers required to be present. - KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); - - serializationProxy.read(currentStateHandleInView); - - if (!isKeySerializerCompatibilityChecked) { - // check for key serializer compatibility; this also reconfigures the - // key serializer to be compatible, if it is required and is possible - TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat = - rocksDBKeyedStateBackend.checkKeySerializerSchemaCompatibility(serializationProxy.getKeySerializerSnapshot()); - if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) { - throw new StateMigrationException("The new key serializer must be compatible."); - } - - isKeySerializerCompatibilityChecked = true; - } - - this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? - SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; - - List<StateMetaInfoSnapshot> restoredMetaInfos = - serializationProxy.getStateMetaInfoSnapshots(); - currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); - - for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) { - - Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn = - rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); - - if (registeredColumn == null) { - byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - - ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( - nameBytes, - rocksDBKeyedStateBackend.columnOptions); - - ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - - // create a meta info for the state on restore; - // this allows us to retain the state in future snapshots even if it wasn't accessed - RegisteredStateMetaInfoBase stateMetaInfo = - RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(restoredMetaInfo); - registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); - rocksDBKeyedStateBackend.kvStateInformation.put(restoredMetaInfo.getName(), registeredColumn); - - } else { - // TODO with eager state registration in place, check here for serializer migration strategies - } - currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); - } - } - - /** - * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. - */ - private void restoreKVStateData() throws IOException, RocksDBException { - //for all key-groups in the current state handle... - try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(rocksDBKeyedStateBackend.db)) { - for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { - int keyGroup = keyGroupOffset.f0; - - // Check that restored key groups all belong to the backend - Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), - "The key group must belong to the backend"); - - long offset = keyGroupOffset.f1; - //not empty key-group? - if (0L != offset) { - currentStateHandleInStream.seek(offset); - try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { - DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - int kvStateId = compressedKgInputView.readShort(); - ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - //insert all k/v pairs into DB - boolean keyGroupHasMoreKeys = true; - while (keyGroupHasMoreKeys) { - byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - if (hasMetaDataFollowsFlag(key)) { - //clear the signal bit in the key to make it ready for insertion again - clearMetaDataFollowsFlag(key); - writeBatchWrapper.put(handle, key, value); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kvStateId = END_OF_KEY_GROUP_MARK - & compressedKgInputView.readShort(); - if (END_OF_KEY_GROUP_MARK == kvStateId) { - keyGroupHasMoreKeys = false; - } else { - handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - } - } else { - writeBatchWrapper.put(handle, key, value); - } - } - } - } - } - } + RocksDBRestoreOperation<K> restoreOperation = getRocksDBRestoreOperation( Review comment: For the current codes, yes I agree, while my questions is actually whether we need to reserve it for future usage, although I don't know what exactly the requirement will be. Or maybe we could add it back whenever the need emerges, won't be hard anyway (smile). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services