tillrohrmann commented on a change in pull request #14837:
URL: https://github.com/apache/flink/pull/14837#discussion_r571130872



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -360,7 +364,12 @@ public void onError(List<KubernetesPod> pods) {
 
         @Override
         public void handleFatalError(Throwable throwable) {

Review comment:
       maybe we should rename this method into `handleError` because it now 
also handles non fatal errors such as the 
`KubernetesTooOldResourceVersionException`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/AbstractKubernetesWatcher.java
##########
@@ -45,7 +47,16 @@ public void onClose(KubernetesClientException cause) {
         if (cause == null) {
             logger.info("The watcher is closing.");
         } else {
-            callbackHandler.handleFatalError(cause);
+            if (cause.getCode() == HTTP_GONE) {

Review comment:
       Maybe add a link to the explanation for this Fabric8 behaviour.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionDriver.java
##########
@@ -235,9 +238,16 @@ public void onError(List<KubernetesConfigMap> configMaps) {
 
         @Override
         public void handleFatalError(Throwable throwable) {
-            fatalErrorHandler.onFatalError(
-                    new LeaderElectionException(
-                            "Error while watching the ConfigMap " + 
configMapName, throwable));
+            if (throwable instanceof KubernetesTooOldResourceVersionException) 
{
+                LOG.info("Creating a new watch on ConfigMap {}.", 
configMapName);
+                kubernetesWatch =
+                        kubeClient.watchConfigMaps(
+                                configMapName, new 
ConfigMapCallbackHandlerImpl());

Review comment:
       I think this violates the threading model of this component which does 
not allow state changes from a different thread. We either need to make this 
component thread safe or tell the `LeaderElectionEventHandler` about the 
required restart.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -360,7 +364,12 @@ public void onError(List<KubernetesPod> pods) {
 
         @Override
         public void handleFatalError(Throwable throwable) {
-            getResourceEventHandler().onError(throwable);
+            if (throwable instanceof KubernetesTooOldResourceVersionException) 
{
+                log.info("Creating a new watch on TaskManager pods.");
+                podsWatchOpt = watchTaskManagerPods();

Review comment:
       I think this should happen in the main thread context because we are 
modifying the state of the `KubernetesResourceMnaagerDriver`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalDriver.java
##########
@@ -115,9 +120,16 @@ public void onError(List<KubernetesConfigMap> configMaps) {
 
         @Override
         public void handleFatalError(Throwable throwable) {
-            fatalErrorHandler.onFatalError(
-                    new LeaderRetrievalException(
-                            "Error while watching the ConfigMap " + 
configMapName));
+            if (throwable instanceof KubernetesTooOldResourceVersionException) 
{
+                LOG.info("Creating a new watch on ConfigMap {}.", 
configMapName);
+                kubernetesWatch =
+                        kubeClient.watchConfigMaps(
+                                configMapName, new 
ConfigMapCallbackHandlerImpl());

Review comment:
       I think it is the same here with changing state from a different thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to