xintongsong commented on a change in pull request #13755: URL: https://github.com/apache/flink/pull/13755#discussion_r510812845
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -64,7 +65,7 @@ private final Time podCreationRetryInterval; - private final FlinkKubeClient kubeClient; + private FlinkKubeClient kubeClient; Review comment: ```suggestion private Optional<FlinkKubeClient> kubeClientOpt; ``` The client is used a lot in this driver. I would suggest to use `Optional` here to force a existence check whenever it is used. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -85,23 +86,31 @@ public KubernetesResourceManagerDriver( Configuration flinkConfig, - FlinkKubeClient kubeClient, KubernetesResourceManagerDriverConfiguration configuration) { super(flinkConfig, GlobalConfiguration.loadConfiguration()); - this.clusterId = Preconditions.checkNotNull(configuration.getClusterId()); this.podCreationRetryInterval = Preconditions.checkNotNull(configuration.getPodCreationRetryInterval()); - this.kubeClient = Preconditions.checkNotNull(kubeClient); this.requestResourceFutures = new HashMap<>(); this.podCreationCoolDown = FutureUtils.completedVoidFuture(); } + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerDriverConfiguration configuration) { + this(flinkConfig, configuration); + this.kubeClient = Preconditions.checkNotNull(kubeClient); + } Review comment: Instead of introducing another constructor, I think we can have only one constructor that takes `KubeClientFactory` as an argument. We would also need to make `KubeClientFactory` and interface and provide a singleton implementation. In this way, the tests can provide a testing factory implementation, without needing to have a separate code path in the production codes. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -85,23 +86,31 @@ public KubernetesResourceManagerDriver( Configuration flinkConfig, - FlinkKubeClient kubeClient, KubernetesResourceManagerDriverConfiguration configuration) { super(flinkConfig, GlobalConfiguration.loadConfiguration()); - this.clusterId = Preconditions.checkNotNull(configuration.getClusterId()); this.podCreationRetryInterval = Preconditions.checkNotNull(configuration.getPodCreationRetryInterval()); - this.kubeClient = Preconditions.checkNotNull(kubeClient); this.requestResourceFutures = new HashMap<>(); this.podCreationCoolDown = FutureUtils.completedVoidFuture(); } + public KubernetesResourceManagerDriver( + Configuration flinkConfig, + FlinkKubeClient kubeClient, + KubernetesResourceManagerDriverConfiguration configuration) { + this(flinkConfig, configuration); + this.kubeClient = Preconditions.checkNotNull(kubeClient); + } + // ------------------------------------------------------------------------ // ResourceManagerDriver // ------------------------------------------------------------------------ @Override protected void initializeInternal() throws Exception { + if (kubeClient == null) { + kubeClient = KubeClientFactory.fromConfiguration(flinkConfig, getIoExecutor()); Review comment: Currently this line of code cannot be covered by the unit tests, due to usage of a different constructor. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java ########## @@ -64,7 +65,7 @@ private final Time podCreationRetryInterval; - private final FlinkKubeClient kubeClient; + private FlinkKubeClient kubeClient; Review comment: Actually, we should also have such protection for `podsWatch`. But it is not a problem of this PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org