Zakelly commented on code in PR #24879:
URL: https://github.com/apache/flink/pull/24879#discussion_r1764914273


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategy.java:
##########
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.snapshot;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.checkpoint.SnapshotType;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
+import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.forst.ForStKeyedStateBackend.ForStKvStateInfo;
+import org.apache.flink.state.forst.ForStResourceContainer;
+import org.apache.flink.state.forst.ForStStateDataTransfer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static 
org.apache.flink.state.forst.snapshot.ForStSnapshotUtil.CURRENT_FILE_NAME;
+import static 
org.apache.flink.state.forst.snapshot.ForStSnapshotUtil.MANIFEST_FILE_PREFIX;
+import static 
org.apache.flink.state.forst.snapshot.ForStSnapshotUtil.SST_FILE_SUFFIX;
+
+/**
+ * Snapshot strategy for {@link 
org.apache.flink.state.forst.ForStKeyedStateBackend} that is based
+ * on disableFileDeletions()+getLiveFiles() of ForSt and creates incremental 
snapshots.
+ *
+ * @param <K> type of the backend keys.
+ */
+public class ForStIncrementalSnapshotStrategy<K>
+        extends ForStSnapshotStrategyBase<
+                K, ForStSnapshotStrategyBase.ForStNativeSnapshotResources> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ForStIncrementalSnapshotStrategy.class);
+
+    private static final String DESCRIPTION = "Asynchronous incremental ForSt 
snapshot";
+
+    /**
+     * Stores the {@link StreamStateHandle} and corresponding local path of 
uploaded SST files that
+     * build the incremental history. Once the checkpoint is confirmed by JM, 
they can be reused for
+     * incremental checkpoint.
+     */
+    @Nonnull private final SortedMap<Long, Collection<HandleAndLocalPath>> 
uploadedSstFiles;
+
+    /** The identifier of the last completed checkpoint. */
+    private long lastCompletedCheckpointId;
+
+    /** The help class used to upload state files. */
+    private final ForStStateDataTransfer stateTransfer;
+
+    public ForStIncrementalSnapshotStrategy(
+            @Nonnull RocksDB db,
+            @Nonnull ResourceGuard forstResourceGuard,
+            @Nonnull ForStResourceContainer resourceContainer,
+            @Nonnull TypeSerializer<K> keySerializer,
+            @Nonnull LinkedHashMap<String, ForStKvStateInfo> 
kvStateInformation,
+            @Nonnull KeyGroupRange keyGroupRange,
+            @Nonnegative int keyGroupPrefixBytes,
+            @Nonnull UUID backendUID,
+            @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> 
uploadedStateHandles,
+            @Nonnull ForStStateDataTransfer stateTransfer,
+            long lastCompletedCheckpointId) {
+
+        super(
+                DESCRIPTION,
+                db,
+                forstResourceGuard,
+                resourceContainer,
+                keySerializer,
+                kvStateInformation,
+                keyGroupRange,
+                keyGroupPrefixBytes,
+                backendUID);
+
+        this.uploadedSstFiles = new TreeMap<>(uploadedStateHandles);
+        this.stateTransfer = stateTransfer;
+        this.lastCompletedCheckpointId = lastCompletedCheckpointId;
+    }
+
+    @Override
+    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
+            ForStNativeSnapshotResources snapshotResources,
+            long checkpointId,
+            long timestamp,
+            @Nonnull CheckpointStreamFactory checkpointStreamFactory,
+            @Nonnull CheckpointOptions checkpointOptions) {
+
+        if (snapshotResources.stateMetaInfoSnapshots.isEmpty()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Asynchronous ForSt snapshot performed on empty keyed 
state at {}. Returning null.",
+                        timestamp);
+            }
+            return registry -> SnapshotResult.empty();
+        }
+
+        final CheckpointType.SharingFilesStrategy sharingFilesStrategy =
+                
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+
+        switch (sharingFilesStrategy) {
+            case FORWARD_BACKWARD:
+                // incremental checkpoint, use origin PreviousSnapshot
+                break;
+            case FORWARD:
+            case NO_SHARING:
+                // full checkpoint, use empty PreviousSnapshot
+                snapshotResources.setPreviousSnapshot(EMPTY_PREVIOUS_SNAPSHOT);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported sharing files strategy: " + 
sharingFilesStrategy);
+        }
+
+        return new ForStIncrementalSnapshotOperation(
+                checkpointId, snapshotResources, checkpointStreamFactory, 
sharingFilesStrategy);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long completedCheckpointId) {
+        synchronized (uploadedSstFiles) {
+            LOG.info("Backend:{} checkpoint:{} complete.", backendUID, 
completedCheckpointId);
+
+            // FLINK-23949: 
materializedSstFiles.keySet().contains(completedCheckpointId) make sure
+            // the notified checkpointId is not a savepoint, otherwise next 
checkpoint will
+            // degenerate into a full checkpoint
+            if (completedCheckpointId > lastCompletedCheckpointId
+                    && uploadedSstFiles.containsKey(completedCheckpointId)) {
+                uploadedSstFiles
+                        .keySet()
+                        .removeIf(checkpointId -> checkpointId < 
completedCheckpointId);
+                lastCompletedCheckpointId = completedCheckpointId;
+            }
+        }
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long abortedCheckpointId) {
+        synchronized (uploadedSstFiles) {
+            LOG.info("Backend:{} checkpoint:{} aborted.", backendUID, 
abortedCheckpointId);
+            uploadedSstFiles.keySet().remove(abortedCheckpointId);
+        }
+    }
+
+    @Override
+    public void close() {
+        stateTransfer.close();
+    }
+
+    @Override
+    protected PreviousSnapshot snapshotMetaData(
+            long checkpointId, @Nonnull List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots) {
+
+        final long lastCompletedCheckpoint;
+        final Collection<HandleAndLocalPath> confirmedSstFiles;
+
+        // use the last completed checkpoint as the comparison base.
+        synchronized (uploadedSstFiles) {
+            lastCompletedCheckpoint = lastCompletedCheckpointId;
+            confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
+            LOG.trace(
+                    "Use confirmed SST files for checkpoint {}: {}",
+                    checkpointId,
+                    confirmedSstFiles);
+        }
+        LOG.trace(
+                "Taking incremental snapshot for checkpoint {}. Snapshot is 
based on last completed checkpoint {} "
+                        + "assuming the following (shared) confirmed files as 
base: {}.",
+                checkpointId,
+                lastCompletedCheckpoint,
+                confirmedSstFiles);
+
+        // snapshot meta data to save
+        for (Map.Entry<String, ForStKvStateInfo> stateMetaInfoEntry :
+                kvStateInformation.entrySet()) {
+            
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
+        }
+        return new PreviousSnapshot(confirmedSstFiles);
+    }
+
+    /** Encapsulates the process to perform an incremental snapshot of a 
ForStKeyedStateBackend. */
+    private final class ForStIncrementalSnapshotOperation extends 
ForStSnapshotOperation {
+
+        @Nonnull private final SnapshotType.SharingFilesStrategy 
sharingFilesStrategy;
+
+        private ForStIncrementalSnapshotOperation(
+                long checkpointId,
+                @Nonnull ForStNativeSnapshotResources snapshotResources,
+                @Nonnull CheckpointStreamFactory checkpointStreamFactory,
+                @Nonnull SnapshotType.SharingFilesStrategy 
sharingFilesStrategy) {
+
+            super(checkpointId, snapshotResources, checkpointStreamFactory);
+            this.sharingFilesStrategy = sharingFilesStrategy;
+        }
+
+        @Override
+        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry 
snapshotCloseableRegistry)
+                throws Exception {
+
+            boolean completed = false;
+
+            final List<StreamStateHandle> reusedHandle = new ArrayList<>();
+
+            try {
+
+                // Handle to the meta data file
+                SnapshotResult<StreamStateHandle> metaStateHandle =
+                        materializeMetaData(
+                                snapshotCloseableRegistry,
+                                tmpResourcesRegistry,
+                                snapshotResources.stateMetaInfoSnapshots,
+                                checkpointId,
+                                checkpointStreamFactory);
+
+                final List<HandleAndLocalPath> sstFiles = new ArrayList<>();
+                final List<HandleAndLocalPath> miscFiles = new ArrayList<>();
+
+                long checkpointedSize = metaStateHandle.getStateSize();
+                checkpointedSize +=
+                        transferSnapshotFiles(
+                                sstFiles,
+                                miscFiles,
+                                snapshotCloseableRegistry,
+                                tmpResourcesRegistry,
+                                reusedHandle);
+
+                final IncrementalRemoteKeyedStateHandle 
jmIncrementalKeyedStateHandle =
+                        new IncrementalRemoteKeyedStateHandle(
+                                backendUID,
+                                keyGroupRange,
+                                checkpointId,
+                                sstFiles,
+                                miscFiles,
+                                metaStateHandle.getJobManagerOwnedSnapshot(),
+                                checkpointedSize);
+
+                completed = true;
+
+                return SnapshotResult.of(jmIncrementalKeyedStateHandle);
+            } finally {
+                snapshotResources.release();
+
+                if (!completed) {
+                    try {
+                        tmpResourcesRegistry.close();
+                    } catch (Exception e) {
+                        LOG.warn("Could not properly clean tmp resources.", e);
+                    }
+
+                } else {
+                    // Report the reuse of state handle to stream factory, 
which is essential for
+                    // file merging mechanism.
+                    
checkpointStreamFactory.reusePreviousStateHandle(reusedHandle);
+                }
+            }
+        }
+
+        /**
+         * Transfer files to checkpoint filesystem and return total uploaded 
size.
+         *
+         * @param sstHandles Empty container, all sst files which should be 
including in checkpoint
+         *     will be put in it.
+         * @param metaHandles Empty container, all meta files (include 
manifest file) which should
+         *     be including in checkpoint will be put in it.
+         * @return Total bytes transfer to checkpoint filesystem.
+         */
+        private long transferSnapshotFiles(
+                @Nonnull List<HandleAndLocalPath> sstHandles,
+                @Nonnull List<HandleAndLocalPath> metaHandles,
+                @Nonnull CloseableRegistry snapshotCloseableRegistry,
+                @Nonnull CloseableRegistry tmpResourcesRegistry,
+                @Nonnull List<StreamStateHandle> reusedHandle)
+                throws Exception {
+
+            Preconditions.checkNotNull(
+                    snapshotResources.liveFiles, "liveFiles were not properly 
created.");
+
+            if (snapshotResources.liveFiles.isEmpty()) {
+                return 0;
+            }
+
+            Tuple4<List<HandleAndLocalPath>, List<Path>, List<Path>, Path> 
classifiedFiles =
+                    classifyFiles();
+
+            sstHandles.addAll(classifiedFiles.f0);
+            // Collect the reuse of state handle.
+            
sstHandles.stream().map(HandleAndLocalPath::getHandle).forEach(reusedHandle::add);
+
+            final CheckpointedStateScope stateScope =
+                    sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
+                            ? CheckpointedStateScope.EXCLUSIVE
+                            : CheckpointedStateScope.SHARED;
+
+            long transferBytes = 0;
+
+            List<HandleAndLocalPath> sstFilesTransferResult =
+                    stateTransfer.transferFilesToCheckpointFs(
+                            classifiedFiles.f1,
+                            checkpointStreamFactory,
+                            stateScope,
+                            snapshotCloseableRegistry,
+                            tmpResourcesRegistry);
+
+            sstHandles.addAll(sstFilesTransferResult);
+            transferBytes +=
+                    sstFilesTransferResult.stream()
+                            .mapToLong(HandleAndLocalPath::getStateSize)
+                            .sum();
+
+            List<HandleAndLocalPath> miscFilesTransferResult =
+                    stateTransfer.transferFilesToCheckpointFs(
+                            classifiedFiles.f2,
+                            checkpointStreamFactory,
+                            stateScope,
+                            snapshotCloseableRegistry,
+                            tmpResourcesRegistry);
+            metaHandles.addAll(miscFilesTransferResult);
+            transferBytes +=
+                    miscFilesTransferResult.stream()
+                            .mapToLong(HandleAndLocalPath::getStateSize)
+                            .sum();
+
+            HandleAndLocalPath manifestFileTransferResult =
+                    stateTransfer.transferFileToCheckpointFs(
+                            classifiedFiles.f3,
+                            snapshotResources.manifestFileSize,
+                            checkpointStreamFactory,
+                            stateScope,
+                            snapshotCloseableRegistry,
+                            tmpResourcesRegistry);
+            metaHandles.add(manifestFileTransferResult);
+            transferBytes += manifestFileTransferResult.getStateSize();
+
+            // To prevent the content of the current file change, the manifest 
file name is directly
+            // used to rewrite the current file during the checkpoint 
asynchronous phase.
+            HandleAndLocalPath currentFileWriteResult =
+                    stateTransfer.writeFileToCheckpointFs(
+                            CURRENT_FILE_NAME,
+                            snapshotResources.manifestFileName,
+                            checkpointStreamFactory,
+                            stateScope,
+                            snapshotCloseableRegistry,
+                            tmpResourcesRegistry);
+            metaHandles.add(currentFileWriteResult);
+            transferBytes += currentFileWriteResult.getStateSize();
+
+            recordReusableHandles(sstHandles);
+
+            return transferBytes;
+        }
+
+        private void recordReusableHandles(List<HandleAndLocalPath> 
sstHandles) {
+            synchronized (uploadedSstFiles) {
+                switch (sharingFilesStrategy) {
+                    case FORWARD_BACKWARD:
+                    case FORWARD:
+                        uploadedSstFiles.put(
+                                checkpointId, 
Collections.unmodifiableList(sstHandles));
+                        break;
+                    case NO_SHARING:
+                        break;
+                    default:
+                        // This is just a safety precaution. It is checked 
before creating the
+                        // ForStIncrementalSnapshotOperation
+                        throw new IllegalArgumentException(
+                                "Unsupported sharing files strategy: " + 
sharingFilesStrategy);
+                }
+            }
+        }
+
+        private Tuple4<List<HandleAndLocalPath>, List<Path>, List<Path>, Path> 
classifyFiles() {
+            int totalFileNum = snapshotResources.liveFiles.size();
+
+            List<HandleAndLocalPath> transferredSstHandles = new 
ArrayList<>(totalFileNum);
+
+            List<Path> toTransferSstFiles = new ArrayList<>(totalFileNum);
+            List<Path> toTransferMiscFiles = new ArrayList<>(totalFileNum);
+            Path toTransferManifestFile = null;
+
+            for (Path filePath : snapshotResources.liveFiles) {
+                final String fileName = filePath.getName();
+
+                if (fileName.startsWith(MANIFEST_FILE_PREFIX)) {

Review Comment:
   Shall we use:
   ```suggestion
                   if (fileName.equals(snapshotResources.manifestFileName)) {
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to