tillrohrmann commented on a change in pull request #14837: URL: https://github.com/apache/flink/pull/14837#discussion_r571130872
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -360,7 +364,12 @@ public void onError(List<KubernetesPod> pods) { @Override public void handleFatalError(Throwable throwable) { Review comment: maybe we should rename this method into `handleError` because it now also handles non fatal errors such as the `KubernetesTooOldResourceVersionException`. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java ########## @@ -45,7 +47,16 @@ public void onClose(KubernetesClientException cause) { if (cause == null) { logger.info("The watcher is closing."); } else { - callbackHandler.handleFatalError(cause); + if (cause.getCode() == HTTP_GONE) { Review comment: Maybe add a link to the explanation for this Fabric8 behaviour. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java ########## @@ -235,9 +238,16 @@ public void onError(List<KubernetesConfigMap> configMaps) { @Override public void handleFatalError(Throwable throwable) { - fatalErrorHandler.onFatalError( - new LeaderElectionException( - "Error while watching the ConfigMap " + configMapName, throwable)); + if (throwable instanceof KubernetesTooOldResourceVersionException) { + LOG.info("Creating a new watch on ConfigMap {}.", configMapName); + kubernetesWatch = + kubeClient.watchConfigMaps( + configMapName, new ConfigMapCallbackHandlerImpl()); Review comment: I think this violates the threading model of this component which does not allow state changes from a different thread. We either need to make this component thread safe or tell the `LeaderElectionEventHandler` about the required restart. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -360,7 +364,12 @@ public void onError(List<KubernetesPod> pods) { @Override public void handleFatalError(Throwable throwable) { - getResourceEventHandler().onError(throwable); + if (throwable instanceof KubernetesTooOldResourceVersionException) { + log.info("Creating a new watch on TaskManager pods."); + podsWatchOpt = watchTaskManagerPods(); Review comment: I think this should happen in the main thread context because we are modifying the state of the `KubernetesResourceMnaagerDriver`. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java ########## @@ -115,9 +120,16 @@ public void onError(List<KubernetesConfigMap> configMaps) { @Override public void handleFatalError(Throwable throwable) { - fatalErrorHandler.onFatalError( - new LeaderRetrievalException( - "Error while watching the ConfigMap " + configMapName)); + if (throwable instanceof KubernetesTooOldResourceVersionException) { + LOG.info("Creating a new watch on ConfigMap {}.", configMapName); + kubernetesWatch = + kubeClient.watchConfigMaps( + configMapName, new ConfigMapCallbackHandlerImpl()); Review comment: I think it is the same here with changing state from a different thread. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org