xintongsong commented on a change in pull request #13755:
URL: https://github.com/apache/flink/pull/13755#discussion_r510580616



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManagerDriver.java
##########
@@ -85,10 +86,20 @@
 
        public KubernetesResourceManagerDriver(
                        Configuration flinkConfig,
-                       FlinkKubeClient kubeClient,
                        KubernetesResourceManagerDriverConfiguration 
configuration) {
                super(flinkConfig, GlobalConfiguration.loadConfiguration());
+               this.clusterId = 
Preconditions.checkNotNull(configuration.getClusterId());
+               this.podCreationRetryInterval = 
Preconditions.checkNotNull(configuration.getPodCreationRetryInterval());
+               this.kubeClient = 
KubeClientFactory.fromConfiguration(flinkConfig, getIoExecutor());

Review comment:
       I'm afraid this would not work.
   
   `AbstractResourceManagerDriver#ioExecutor` is `null` until the driver is 
initialized. The constructor is executed before the initialization, thus 
`getIoExecutor` should throw `IllegalStateException`.
   
   I think this also exposes the shortcoming for using different constructors 
for production and testing. The `getIoExecutor` in the constructor is not 
covered by any test.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java
##########
@@ -71,7 +74,8 @@ public static FlinkKubeClient fromConfiguration(Configuration 
flinkConfig) {
 
                final KubernetesClient client = new 
DefaultKubernetesClient(config);
 
-               return new Fabric8FlinkKubeClient(flinkConfig, client, 
KubeClientFactory::createThreadPoolForAsyncIO);
+               return new Fabric8FlinkKubeClient(flinkConfig, client, 
ioExecutor == null ?
+                               KubeClientFactory::createThreadPoolForAsyncIO : 
() -> ioExecutor);

Review comment:
       Might be better to derive `ioExecutor` in 
`fromConfiguration(Configuration flinkConfig)`.
   Benefits are:
   - Avoid passing null value arguments.
   - Improves readability that `fromConfiguration(Configuration flinkConfig)` 
itself shows which executor will be used when the user does not specify one.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DefaultDispatcherResourceManagerComponentFactory.java
##########
@@ -69,7 +69,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;

Review comment:
       I'm not sure about replacing `Executor` with `ExecutorService` for all 
these places.
   
   Looking into `Fabric8FlinkKubeClient`, I think the only reason 
`FlinkKubeClient` uses `ExecutorService` rather than `Executor` is that, it 
will shutdown and release the thread pool in `close`. That is because the 
client uses a dedicated thread pool, and if it does not shutdown the thread 
pool, other components won't do that either.
   
   Since now the client uses a shared IO executor, it should no longer shutdown 
the executor on close. Otherwise, all the other components using that IO 
executor would be affected. That means we can replace `ExecutorService` with 
`Executor` for `FlinkKubeClient`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to