Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114283114
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
                }
        }
     
    +   private static class RocksDBIncrementalSnapshotOperation {
    +
    +           private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +           private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +           private final long checkpointId;
    +
    +           private final long checkpointTimestamp;
    +
    +           private Map<String, StreamStateHandle> baseSstFiles;
    +
    +           private List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> stateMetaInfos = new ArrayList<>();
    +
    +           private FileSystem backupFileSystem;
    +           private Path backupPath;
    +
    +           private FSDataInputStream inputStream = null;
    +           private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
    +
    +           // new sst files since the last completed checkpoint
    +           private Set<String> newSstFileNames = new HashSet<>();
    +
    +           // handles to the sst files in the current snapshot
    +           private Map<String, StreamStateHandle> sstFiles = new 
HashMap<>();
    +
    +           // handles to the misc files in the current snapshot
    +           private Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
    +
    +           private StreamStateHandle metaStateHandle = null;
    +
    +           private RocksDBIncrementalSnapshotOperation(
    +                           RocksDBKeyedStateBackend<?> stateBackend,
    +                           CheckpointStreamFactory checkpointStreamFactory,
    +                           long checkpointId,
    +                           long checkpointTimestamp) {
    +
    +                   this.stateBackend = stateBackend;
    +                   this.checkpointStreamFactory = checkpointStreamFactory;
    +                   this.checkpointId = checkpointId;
    +                   this.checkpointTimestamp = checkpointTimestamp;
    +           }
    +
    +           private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
    +                   try {
    +                           final byte[] buffer = new byte[1024];
    +
    +                           FileSystem backupFileSystem = 
backupPath.getFileSystem();
    +                           inputStream = backupFileSystem.open(filePath);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +                           outputStream = checkpointStreamFactory
    +                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
    +
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +
    +                           return outputStream.closeAndGetHandle();
    +                   } finally {
    +                           if (inputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +                                   inputStream.close();
    +                                   inputStream = null;
    +                           }
    +
    +                           if (outputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                                   outputStream.close();
    +                                   outputStream = null;
    +                           }
    +                   }
    +           }
    +
    +           private StreamStateHandle materializeMetaData() throws 
Exception {
    +                   try {
    +                           outputStream = checkpointStreamFactory
    +                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           KeyedBackendSerializationProxy 
serializationProxy =
    +                                   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +                           DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
    +
    +                           serializationProxy.write(out);
    +
    +                           return outputStream.closeAndGetHandle();
    +                   } finally {
    +                           if (outputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                                   outputStream.close();
    +                                   outputStream = null;
    +                           }
    +                   }
    +           }
    +
    +           void takeSnapshot() throws Exception {
    +                   // use the last completed checkpoint as the comparison 
base.
    +                   baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +                   // save meta data
    +                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : 
stateBackend.kvStateInformation.entrySet()) {
    +
    +                           RegisteredBackendStateMetaInfo<?, ?> metaInfo = 
stateMetaInfoEntry.getValue().f1;
    +
    +                           KeyedBackendSerializationProxy.StateMetaInfo<?, 
?> metaInfoProxy =
    +                                   new 
KeyedBackendSerializationProxy.StateMetaInfo<>(
    +                                           metaInfo.getStateType(),
    +                                           metaInfo.getName(),
    +                                           
metaInfo.getNamespaceSerializer(),
    +                                           metaInfo.getStateSerializer());
    +
    +                           stateMetaInfos.add(metaInfoProxy);
    +                   }
    +
    +                   // save state data
    +                   backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +                   backupFileSystem = backupPath.getFileSystem();
    +                   if (backupFileSystem.exists(backupPath)) {
    +                           LOG.warn("Deleting an existing local checkpoint 
directory " +
    +                                   backupPath + ".");
    +
    +                           backupFileSystem.delete(backupPath, true);
    +                   }
    +
    +                   // create hard links of living files in the checkpoint 
path
    +                   Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db);
    +                   checkpoint.createCheckpoint(backupPath.getPath());
    --- End diff --
    
    @gyfora I think your initial concerns are valid. We need to flush the 
memtable to disk prior to a checkpoint and this will generate a SST. What is 
required for fast checkpoint intervals is an alternative mechanism to quickly 
determine a delta from the previous incremental checkpoint. This could be a 
changelog buffer in Flink that we maintain up to a certain size. The content 
becomes part of the private state in the incremental snapshot and it is dropped 
i) after each checkpoint or ii) after exceeding a certain size which to justify 
writing a new SST.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to