XComp commented on a change in pull request #17485: URL: https://github.com/apache/flink/pull/17485#discussion_r786874117
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java ########## @@ -78,21 +84,46 @@ public CompletedCheckpointStore createRecoveredCompletedCheckpointStore( SharedStateRegistryFactory sharedStateRegistryFactory, Executor ioExecutor) throws Exception { + final String configMapName = getConfigMapNameFunction.apply(jobID); + KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient, configMapName, clusterId); return KubernetesUtils.createCompletedCheckpointStore( configuration, kubeClient, executor, - getConfigMapNameFunction.apply(jobID), + configMapName, lockIdentity, maxNumberOfCheckpointsToRetain, sharedStateRegistryFactory, ioExecutor); } @Override - public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) { - return new KubernetesCheckpointIDCounter( - kubeClient, getConfigMapNameFunction.apply(jobID), lockIdentity); + public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception { Review comment: To me, it would feel more natural to have a subclass of `KubernetesCheckpointRecoveryFactory` that takes care of the ConfigMap creation. But I don't have a strong argument towards refactoring this code because we wouldn't gain much from such a refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org