[ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993349#comment-15993349 ]
ASF GitHub Bot commented on FLINK-6364: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3801#discussion_r114364811 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend<?> stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) { + this.stateBackend = stateBackend; + } + + private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles(); + for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet()) { + String fileName = sstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = sstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles(); + for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) { + String fileName = miscFileEntry.getKey(); + StreamStateHandle remoteFileHandle = miscFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + // read meta data + List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies = + readMetaData(restoreStateHandle.getMetaStateHandle()); + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); + columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET))); + + for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : stateMetaInfoProxies) { + + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET), + stateBackend.columnOptions); + + columnFamilyDescriptors.add(columnFamilyDescriptor); + } + + if (hasExtraKeys) { --- End diff -- I wonder if we could prune key-groups based on this: https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys. If not, would it make sense to bulk the inserts using the multi-put feature? > Implement incremental checkpointing in RocksDBStateBackend > ---------------------------------------------------------- > > Key: FLINK-6364 > URL: https://issues.apache.org/jira/browse/FLINK-6364 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > {{RocksDBStateBackend}} is well suited for incremental checkpointing because > RocksDB is base on LSM trees, which record updates in new sst files and all > sst files are immutable. By only materializing those new sst files, we can > significantly improve the performance of checkpointing. -- This message was sent by Atlassian JIRA (v6.3.15#6346)