[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996212#comment-15996212
 ] 

ASF GitHub Bot commented on FLINK-6364:
---------------------------------------

Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114703775
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
                }
        }
     
    +   private static class RocksDBIncrementalSnapshotOperation {
    +
    +           private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +           private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +           private final long checkpointId;
    +
    +           private final long checkpointTimestamp;
    +
    +           private Map<String, StreamStateHandle> baseSstFiles;
    +
    +           private List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> stateMetaInfos = new ArrayList<>();
    +
    +           private FileSystem backupFileSystem;
    +           private Path backupPath;
    +
    +           private FSDataInputStream inputStream = null;
    +           private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
    +
    +           // new sst files since the last completed checkpoint
    +           private Set<String> newSstFileNames = new HashSet<>();
    +
    +           // handles to the sst files in the current snapshot
    +           private Map<String, StreamStateHandle> sstFiles = new 
HashMap<>();
    +
    +           // handles to the misc files in the current snapshot
    +           private Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
    +
    +           private StreamStateHandle metaStateHandle = null;
    +
    +           private RocksDBIncrementalSnapshotOperation(
    +                           RocksDBKeyedStateBackend<?> stateBackend,
    +                           CheckpointStreamFactory checkpointStreamFactory,
    +                           long checkpointId,
    +                           long checkpointTimestamp) {
    +
    +                   this.stateBackend = stateBackend;
    +                   this.checkpointStreamFactory = checkpointStreamFactory;
    +                   this.checkpointId = checkpointId;
    +                   this.checkpointTimestamp = checkpointTimestamp;
    +           }
    +
    +           private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
    +                   try {
    +                           final byte[] buffer = new byte[1024];
    +
    +                           FileSystem backupFileSystem = 
backupPath.getFileSystem();
    +                           inputStream = backupFileSystem.open(filePath);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +                           outputStream = checkpointStreamFactory
    +                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
    +
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +
    +                           return outputStream.closeAndGetHandle();
    +                   } finally {
    +                           if (inputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +                                   inputStream.close();
    +                                   inputStream = null;
    +                           }
    +
    +                           if (outputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                                   outputStream.close();
    +                                   outputStream = null;
    +                           }
    +                   }
    +           }
    +
    +           private StreamStateHandle materializeMetaData() throws 
Exception {
    +                   try {
    +                           outputStream = checkpointStreamFactory
    +                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           KeyedBackendSerializationProxy 
serializationProxy =
    +                                   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +                           DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
    +
    +                           serializationProxy.write(out);
    +
    +                           return outputStream.closeAndGetHandle();
    +                   } finally {
    +                           if (outputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                                   outputStream.close();
    +                                   outputStream = null;
    +                           }
    +                   }
    +           }
    +
    +           void takeSnapshot() throws Exception {
    +                   // use the last completed checkpoint as the comparison 
base.
    +                   baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +                   // save meta data
    +                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : 
stateBackend.kvStateInformation.entrySet()) {
    +
    +                           RegisteredBackendStateMetaInfo<?, ?> metaInfo = 
stateMetaInfoEntry.getValue().f1;
    +
    +                           KeyedBackendSerializationProxy.StateMetaInfo<?, 
?> metaInfoProxy =
    +                                   new 
KeyedBackendSerializationProxy.StateMetaInfo<>(
    +                                           metaInfo.getStateType(),
    +                                           metaInfo.getName(),
    +                                           
metaInfo.getNamespaceSerializer(),
    +                                           metaInfo.getStateSerializer());
    +
    +                           stateMetaInfos.add(metaInfoProxy);
    +                   }
    +
    +                   // save state data
    +                   backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +                   backupFileSystem = backupPath.getFileSystem();
    +                   if (backupFileSystem.exists(backupPath)) {
    +                           LOG.warn("Deleting an existing local checkpoint 
directory " +
    +                                   backupPath + ".");
    +
    +                           backupFileSystem.delete(backupPath, true);
    +                   }
    +
    +                   // create hard links of living files in the checkpoint 
path
    +                   Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db);
    +                   checkpoint.createCheckpoint(backupPath.getPath());
    +           }
    +
    +           KeyedStateHandle materializeSnapshot() throws Exception {
    +                   // write meta data
    +                   metaStateHandle = materializeMetaData();
    +
    +                   // write state data
    +                   
Preconditions.checkState(backupFileSystem.exists(backupPath));
    +
    +                   FileStatus[] fileStatuses = 
backupFileSystem.listStatus(backupPath);
    +                   if (fileStatuses != null ) {
    +                           for (FileStatus fileStatus : fileStatuses) {
    +                                   Path filePath = fileStatus.getPath();
    +                                   String fileName = filePath.getName();
    +
    +                                   if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
    +                                           StreamStateHandle fileHandle =
    +                                                   baseSstFiles == null ? 
null : baseSstFiles.get(fileName);
    +
    +                                           if (fileHandle == null) {
    +                                                   
newSstFileNames.add(fileName);
    +                                                   fileHandle = 
materializeStateData(filePath);
    +                                           }
    +
    +                                           sstFiles.put(fileName, 
fileHandle);
    +                                   } else {
    +                                           StreamStateHandle fileHandle = 
materializeStateData(filePath);
    +                                           miscFiles.put(fileName, 
fileHandle);
    +                                   }
    +                           }
    +                   }
    +
    +                   stateBackend.materializedSstFiles.put(checkpointId, 
sstFiles);
    +
    +                   return new RocksDBKeyedStateHandle(stateBackend.jobId,
    +                           stateBackend.operatorIdentifier, 
stateBackend.keyGroupRange,
    +                           newSstFileNames, sstFiles, miscFiles, 
metaStateHandle);
    +           }
    +
    +           void releaseResources(boolean canceled) {
    +
    +                   if (inputStream != null) {
    +                           
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +                           try {
    +                                   inputStream.close();
    +                           } catch (Exception e) {
    +                                   LOG.warn("Could not properly close the 
input stream.", e);
    +                           }
    +                           inputStream = null;
    +                   }
    +
    +                   if (outputStream != null) {
    +                           
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                           try {
    +                                   outputStream.close();
    +                           } catch (Exception e) {
    +                                   LOG.warn("Could not properly close the 
output stream.", e);
    +                           }
    +                           outputStream = null;
    +                   }
    +
    +                   if (backupPath != null) {
    +                           try {
    +                                   if 
(backupFileSystem.exists(backupPath)) {
    +                                           
backupFileSystem.delete(backupPath, true);
    +                                   }
    +                           } catch (Exception e) {
    +                                   LOG.warn("Could not properly delete the 
checkpoint directory.", e);
    +                           }
    +                   }
    +
    +                   if (canceled) {
    +                           List<StateObject> statesToDiscard = new 
ArrayList<>();
    +
    +                           if (metaStateHandle != null) {
    +                                   statesToDiscard.add(metaStateHandle);
    +                           }
    +
    +                           statesToDiscard.addAll(miscFiles.values());
    +
    +                           for (String newSstFileName : newSstFileNames) {
    +                                   StreamStateHandle fileHandle = 
sstFiles.get(newSstFileName);
    +                                   if (fileHandle != null) {
    +                                           statesToDiscard.add(fileHandle);
    +                                   }
    +                           }
    +
    +                           try {
    +                                   
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
    --- End diff --
    
    You are absolutely right. Global cleanup hooks are urgently needed here to 
clean unused states here. The hook at job manager instead of the shared state 
registry is supposed to do the work because unused private states should be 
cleaned as well.   At the startup, the hook will know the checkpoint from which 
we are restoring and only retain the data in restored completed checkpoints.
    
    The local checkpointing directories will be deleted once the checkpoint 
completed at the TM. Since local checkpoint directories are all under the 
directory for the backend which is deleted when the backend is disposed, they 
can also be deleted if the backend is correctly closed. 
    
    But in the cases when the TM fails during the closing of the backend, the 
local checkpoint directories will be left on the file system. The problem does 
not matter in Yarn clusters but may be very severe in standalone clusters. What 
do you think of the problem?


> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
>                 Key: FLINK-6364
>                 URL: https://issues.apache.org/jira/browse/FLINK-6364
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because 
> RocksDB is base on LSM trees,  which record updates in new sst files and all 
> sst files are immutable. By only materializing those new sst files, we can 
> significantly improve the performance of checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to