xintongsong commented on a change in pull request #13755:
URL: https://github.com/apache/flink/pull/13755#discussion_r510812845



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -64,7 +65,7 @@
 
        private final Time podCreationRetryInterval;
 
-       private final FlinkKubeClient kubeClient;
+       private FlinkKubeClient kubeClient;

Review comment:
       ```suggestion
        private Optional<FlinkKubeClient> kubeClientOpt;
   ```
   The client is used a lot in this driver. I would suggest to use `Optional` 
here to force a existence check whenever it is used.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -85,23 +86,31 @@
 
        public KubernetesResourceManagerDriver(
                        Configuration flinkConfig,
-                       FlinkKubeClient kubeClient,
                        KubernetesResourceManagerDriverConfiguration 
configuration) {
                super(flinkConfig, GlobalConfiguration.loadConfiguration());
-
                this.clusterId = 
Preconditions.checkNotNull(configuration.getClusterId());
                this.podCreationRetryInterval = 
Preconditions.checkNotNull(configuration.getPodCreationRetryInterval());
-               this.kubeClient = Preconditions.checkNotNull(kubeClient);
                this.requestResourceFutures = new HashMap<>();
                this.podCreationCoolDown = FutureUtils.completedVoidFuture();
        }
 
+       public KubernetesResourceManagerDriver(
+                       Configuration flinkConfig,
+                       FlinkKubeClient kubeClient,
+                       KubernetesResourceManagerDriverConfiguration 
configuration) {
+               this(flinkConfig, configuration);
+               this.kubeClient = Preconditions.checkNotNull(kubeClient);
+       }

Review comment:
       Instead of introducing another constructor, I think we can have only one 
constructor that takes `KubeClientFactory` as an argument. We would also need 
to make `KubeClientFactory` and interface and provide a singleton 
implementation.
   In this way, the tests can provide a testing factory implementation, without 
needing to have a separate code path in the production codes.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -85,23 +86,31 @@
 
        public KubernetesResourceManagerDriver(
                        Configuration flinkConfig,
-                       FlinkKubeClient kubeClient,
                        KubernetesResourceManagerDriverConfiguration 
configuration) {
                super(flinkConfig, GlobalConfiguration.loadConfiguration());
-
                this.clusterId = 
Preconditions.checkNotNull(configuration.getClusterId());
                this.podCreationRetryInterval = 
Preconditions.checkNotNull(configuration.getPodCreationRetryInterval());
-               this.kubeClient = Preconditions.checkNotNull(kubeClient);
                this.requestResourceFutures = new HashMap<>();
                this.podCreationCoolDown = FutureUtils.completedVoidFuture();
        }
 
+       public KubernetesResourceManagerDriver(
+                       Configuration flinkConfig,
+                       FlinkKubeClient kubeClient,
+                       KubernetesResourceManagerDriverConfiguration 
configuration) {
+               this(flinkConfig, configuration);
+               this.kubeClient = Preconditions.checkNotNull(kubeClient);
+       }
+
        // 
------------------------------------------------------------------------
        //  ResourceManagerDriver
        // 
------------------------------------------------------------------------
 
        @Override
        protected void initializeInternal() throws Exception {
+               if (kubeClient == null) {
+                       kubeClient = 
KubeClientFactory.fromConfiguration(flinkConfig, getIoExecutor());

Review comment:
       Currently this line of code cannot be covered by the unit tests, due to 
usage of a different constructor.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -64,7 +65,7 @@
 
        private final Time podCreationRetryInterval;
 
-       private final FlinkKubeClient kubeClient;
+       private FlinkKubeClient kubeClient;

Review comment:
       Actually, we should also have such protection for `podsWatch`. But it is 
not a problem of this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to