masteryhx commented on code in PR #22744:
URL: https://github.com/apache/flink/pull/22744#discussion_r1268880413


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -375,6 +378,51 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot(
             @Nonnull CheckpointStreamFactory streamFactory,
             @Nonnull CheckpointOptions checkpointOptions)
             throws Exception {
+
+        if (checkpointOptions.getCheckpointType().isSavepoint()) {
+            SnapshotType.SharingFilesStrategy sharingFilesStrategy =
+                    
checkpointOptions.getCheckpointType().getSharingFilesStrategy();
+            if (sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING) {
+                long materializationID = materializedId++;
+                // For NO_SHARING native savepoint, trigger delegated one
+                RunnableFuture<SnapshotResult<KeyedStateHandle>> 
delegatedSnapshotResult =
+                        keyedStateBackend.snapshot(
+                                materializationID, timestamp, streamFactory, 
checkpointOptions);

Review Comment:
   I think notification about checkpoint complete could be done like normal 
checkpoint by add mapping in the materializationIdByCheckpointId.
   notification about checkpoint abortion has not done for normal checkpoint 
(FLINK-25850), so I think it could be considered together in the FLINK-25850.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to