[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993349#comment-15993349
 ] 

ASF GitHub Bot commented on FLINK-6364:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114364811
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
                }
        }
     
    +   private static class RocksDBIncrementalRestoreOperation {
    +
    +           private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +           private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
    +                   this.stateBackend = stateBackend;
    +           }
    +
    +           private List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> readMetaData(
    +                           StreamStateHandle metaStateHandle) throws 
Exception {
    +
    +                   FSDataInputStream inputStream = null;
    +
    +                   try {
    +                           inputStream = metaStateHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +                           KeyedBackendSerializationProxy 
serializationProxy =
    +                                   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +                           DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
    +                           serializationProxy.read(in);
    +
    +                           return 
serializationProxy.getNamedStateSerializationProxies();
    +                   } finally {
    +                           if (inputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +                                   inputStream.close();
    +                           }
    +                   }
    +           }
    +
    +           private void readStateData(
    +                           Path restoreFilePath,
    +                           StreamStateHandle remoteFileHandle) throws 
IOException {
    +
    +                   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
    +
    +                   FSDataInputStream inputStream = null;
    +                   FSDataOutputStream outputStream = null;
    +
    +                   try {
    +                           inputStream = 
remoteFileHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +                           outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           byte[] buffer = new byte[1024];
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
    +
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +                   } finally {
    +                           if (inputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +                                   inputStream.close();
    +                           }
    +
    +                           if (outputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                                   outputStream.close();
    +                           }
    +                   }
    +           }
    +
    +           private void restoreInstance(
    +                           RocksDBKeyedStateHandle restoreStateHandle,
    +                           boolean hasExtraKeys) throws Exception {
    +
    +                   // read state data
    +                   Path restoreInstancePath = new Path(
    +                           stateBackend.instanceBasePath.getAbsolutePath(),
    +                           UUID.randomUUID().toString());
    +
    +                   try {
    +                           Map<String, StreamStateHandle> sstFiles = 
restoreStateHandle.getSstFiles();
    +                           for (Map.Entry<String, StreamStateHandle> 
sstFileEntry : sstFiles.entrySet()) {
    +                                   String fileName = sstFileEntry.getKey();
    +                                   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
    +
    +                                   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
    +                           }
    +
    +                           Map<String, StreamStateHandle> miscFiles = 
restoreStateHandle.getMiscFiles();
    +                           for (Map.Entry<String, StreamStateHandle> 
miscFileEntry : miscFiles.entrySet()) {
    +                                   String fileName = 
miscFileEntry.getKey();
    +                                   StreamStateHandle remoteFileHandle = 
miscFileEntry.getValue();
    +
    +                                   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
    +                           }
    +
    +                           // read meta data
    +                           
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
    +                                   
readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +                           List<ColumnFamilyDescriptor> 
columnFamilyDescriptors = new ArrayList<>();
    +                           columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +                           for 
(KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : 
stateMetaInfoProxies) {
    +
    +                                   ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +                                           
stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +                                           stateBackend.columnOptions);
    +
    +                                   
columnFamilyDescriptors.add(columnFamilyDescriptor);
    +                           }
    +
    +                           if (hasExtraKeys) {
    --- End diff --
    
    I wonder if we could prune key-groups based on this: 
https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys.
    
    If not, would it make sense to bulk the inserts using the multi-put feature?


> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
>                 Key: FLINK-6364
>                 URL: https://issues.apache.org/jira/browse/FLINK-6364
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because 
> RocksDB is base on LSM trees,  which record updates in new sst files and all 
> sst files are immutable. By only materializing those new sst files, we can 
> significantly improve the performance of checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to