XComp commented on a change in pull request #17485:
URL: https://github.com/apache/flink/pull/17485#discussion_r786874117



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
##########
@@ -78,21 +84,46 @@ public CompletedCheckpointStore 
createRecoveredCompletedCheckpointStore(
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor)
             throws Exception {
+        final String configMapName = getConfigMapNameFunction.apply(jobID);
+        KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient, 
configMapName, clusterId);
 
         return KubernetesUtils.createCompletedCheckpointStore(
                 configuration,
                 kubeClient,
                 executor,
-                getConfigMapNameFunction.apply(jobID),
+                configMapName,
                 lockIdentity,
                 maxNumberOfCheckpointsToRetain,
                 sharedStateRegistryFactory,
                 ioExecutor);
     }
 
     @Override
-    public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) {
-        return new KubernetesCheckpointIDCounter(
-                kubeClient, getConfigMapNameFunction.apply(jobID), 
lockIdentity);
+    public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws 
Exception {

Review comment:
       To me, it would feel more natural to have a subclass of 
`KubernetesCheckpointRecoveryFactory` that takes care of the ConfigMap 
creation. But I don't have a strong argument towards refactoring this code 
because we wouldn't gain much from such a refactoring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to