Github user shixiaogang commented on a diff in the pull request:
https://github.com/apache/flink/pull/3801#discussion_r114500597
--- Diff:
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
@@ -621,6 +692,237 @@ private static void checkInterrupted() throws
InterruptedException {
}
}
+ private static class RocksDBIncrementalSnapshotOperation {
+
+ private final RocksDBKeyedStateBackend<?> stateBackend;
+
+ private final CheckpointStreamFactory checkpointStreamFactory;
+
+ private final long checkpointId;
+
+ private final long checkpointTimestamp;
+
+ private Map<String, StreamStateHandle> baseSstFiles;
+
+ private List<KeyedBackendSerializationProxy.StateMetaInfo<?,
?>> stateMetaInfos = new ArrayList<>();
+
+ private FileSystem backupFileSystem;
+ private Path backupPath;
+
+ private FSDataInputStream inputStream = null;
+ private CheckpointStreamFactory.CheckpointStateOutputStream
outputStream = null;
+
+ // new sst files since the last completed checkpoint
+ private Set<String> newSstFileNames = new HashSet<>();
+
+ // handles to the sst files in the current snapshot
+ private Map<String, StreamStateHandle> sstFiles = new
HashMap<>();
+
+ // handles to the misc files in the current snapshot
+ private Map<String, StreamStateHandle> miscFiles = new
HashMap<>();
+
+ private StreamStateHandle metaStateHandle = null;
+
+ private RocksDBIncrementalSnapshotOperation(
+ RocksDBKeyedStateBackend<?> stateBackend,
+ CheckpointStreamFactory checkpointStreamFactory,
+ long checkpointId,
+ long checkpointTimestamp) {
+
+ this.stateBackend = stateBackend;
+ this.checkpointStreamFactory = checkpointStreamFactory;
+ this.checkpointId = checkpointId;
+ this.checkpointTimestamp = checkpointTimestamp;
+ }
+
+ private StreamStateHandle materializeStateData(Path filePath)
throws Exception {
+ try {
+ final byte[] buffer = new byte[1024];
+
+ FileSystem backupFileSystem =
backupPath.getFileSystem();
+ inputStream = backupFileSystem.open(filePath);
+
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
+
+ outputStream = checkpointStreamFactory
+
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ while (true) {
+ int numBytes = inputStream.read(buffer);
+
+ if (numBytes == -1) {
+ break;
+ }
+
+ outputStream.write(buffer, 0, numBytes);
+ }
+
+ return outputStream.closeAndGetHandle();
+ } finally {
+ if (inputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
+ inputStream.close();
+ inputStream = null;
+ }
+
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+ }
+
+ private StreamStateHandle materializeMetaData() throws
Exception {
+ try {
+ outputStream = checkpointStreamFactory
+
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+
+ KeyedBackendSerializationProxy
serializationProxy =
+ new
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
+ DataOutputView out = new
DataOutputViewStreamWrapper(outputStream);
+
+ serializationProxy.write(out);
+
+ return outputStream.closeAndGetHandle();
+ } finally {
+ if (outputStream != null) {
+
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+ outputStream.close();
+ outputStream = null;
+ }
+ }
+ }
+
+ void takeSnapshot() throws Exception {
+ // use the last completed checkpoint as the comparison
base.
+ baseSstFiles =
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+
+ // save meta data
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle,
RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry :
stateBackend.kvStateInformation.entrySet()) {
+
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo =
stateMetaInfoEntry.getValue().f1;
+
+ KeyedBackendSerializationProxy.StateMetaInfo<?,
?> metaInfoProxy =
+ new
KeyedBackendSerializationProxy.StateMetaInfo<>(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+
metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
+
+ stateMetaInfos.add(metaInfoProxy);
+ }
+
+ // save state data
+ backupPath = new
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
+ backupFileSystem = backupPath.getFileSystem();
+ if (backupFileSystem.exists(backupPath)) {
+ LOG.warn("Deleting an existing local checkpoint
directory " +
+ backupPath + ".");
+
+ backupFileSystem.delete(backupPath, true);
+ }
+
+ // create hard links of living files in the checkpoint
path
+ Checkpoint checkpoint =
Checkpoint.create(stateBackend.db);
+ checkpoint.createCheckpoint(backupPath.getPath());
--- End diff --
@StefanRRichter Thanks for your explanation. Your method to allow fast
checkpoint is very smart. I think it can help us to avoid small sst files
produced by the flushing. But i think we should also be careful about the
flushing of the buffer, because it might block the processing if the buffer is
very big.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---