[ 
https://issues.apache.org/jira/browse/FLINK-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587345#comment-16587345
 ] 

ASF GitHub Bot commented on FLINK-10042:
----------------------------------------

StefanRRichter commented on a change in pull request #6556: FLINK-10042][state] 
Extract snapshot algorithms from inner classes of RocksDBKeyedStateBackend into 
full classes
URL: https://github.com/apache/flink/pull/6556#discussion_r211583899
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##########
 @@ -508,41 +486,87 @@ public void restore(Collection<KeyedStateHandle> 
restoreState) throws Exception
                restoredKvStateMetaInfos.clear();
 
                try {
+                       RocksDBIncrementalRestoreOperation<K> 
incrementalRestoreOperation = null;
                        if (restoreState == null || restoreState.isEmpty()) {
                                createDB();
                        } else {
                                KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
                                if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
                                        || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
-                                       RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
-                                       restoreOperation.restore(restoreState);
+                                       incrementalRestoreOperation = new 
RocksDBIncrementalRestoreOperation<>(this);
+                                       
incrementalRestoreOperation.restore(restoreState);
                                } else {
-                                       RocksDBFullRestoreOperation<K> 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
-                                       
restoreOperation.doRestore(restoreState);
+                                       RocksDBFullRestoreOperation<K> 
fullRestoreOperation = new RocksDBFullRestoreOperation<>(this);
+                                       
fullRestoreOperation.doRestore(restoreState);
                                }
                        }
+
+                       initializeSnapshotStrategy(incrementalRestoreOperation);
                } catch (Exception ex) {
                        dispose();
                        throw ex;
                }
        }
 
-       @Override
-       public void notifyCheckpointComplete(long completedCheckpointId) {
-
-               if (!enableIncrementalCheckpointing) {
-                       return;
+       @VisibleForTesting
+       void initializeSnapshotStrategy(
+               @Nullable RocksDBIncrementalRestoreOperation<K> 
incrementalRestoreOperation) {
+
+               this.savepointSnapshotStrategy =
+                       new RocksFullSnapshotStrategy<>(
+                               db,
+                               rocksDBResourceGuard,
+                               keySerializer,
+                               kvStateInformation,
+                               keyGroupRange,
+                               keyGroupPrefixBytes,
+                               localRecoveryConfig,
+                               cancelStreamRegistry,
+                               keyGroupCompressionDecorator);
+
+               if (enableIncrementalCheckpointing) {
+                       final UUID backendUID;
+                       final SortedMap<Long, Set<StateHandleID>> 
materializedSstFiles;
+                       final long lastCompletedCheckpointId;
+
+                       if (incrementalRestoreOperation == null) {
+                               backendUID = UUID.randomUUID();
+                               materializedSstFiles = new TreeMap<>();
+                               lastCompletedCheckpointId = -1L;
+                       } else {
+                               backendUID = 
Preconditions.checkNotNull(incrementalRestoreOperation.getRestoredBackendUID());
 
 Review comment:
   There are still more steps in my plan of refactoring this class, and the 
next is construction/instatiation/restore. I also want to offload all those 
object construction concerns into a separate builder. That builder will also 
just forward all the mentioned things directly to the incremental snapshot 
operation instance. The backend itself will have a private construction that 
can do just final field assignments. That being said, I could group together 
this information in a coherent way when I will take that next step.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract snapshot algorithms from inner classes into full classes
> ----------------------------------------------------------------
>
>                 Key: FLINK-10042
>                 URL: https://issues.apache.org/jira/browse/FLINK-10042
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to