wangyang0918 commented on a change in pull request #18119:
URL: https://github.com/apache/flink/pull/18119#discussion_r772792426



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java
##########
@@ -36,13 +42,48 @@ public KubernetesSessionClusterEntrypoint(Configuration 
configuration) {
         super(configuration);
     }
 
+    private int blobPort;
+
     @Override
     protected DispatcherResourceManagerComponentFactory
             createDispatcherResourceManagerComponentFactory(Configuration 
configuration) {
         return 
DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
                 KubernetesResourceManagerFactory.getInstance());
     }
 
+    @Override
+    protected void updateBlobPort(int port) {

Review comment:
       Adding the interface `updateBlobPort()` `additional()` in 
`ClusterEntrypoint` and doing the implementation here are really not a good 
proposal. If we want to update the K8s internal/external services, then it 
could happen in `KubernetesResourceManagerDriver#initializeInternal()`.
   
   BTW, not only the session mode, we also need to support host network for 
application mode.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java
##########
@@ -36,13 +42,48 @@ public KubernetesSessionClusterEntrypoint(Configuration 
configuration) {
         super(configuration);
     }
 
+    private int blobPort;
+
     @Override
     protected DispatcherResourceManagerComponentFactory
             createDispatcherResourceManagerComponentFactory(Configuration 
configuration) {
         return 
DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
                 KubernetesResourceManagerFactory.getInstance());
     }
 
+    @Override
+    protected void updateBlobPort(int port) {
+        this.blobPort = port;
+    }
+
+    @Override
+    protected String getRPCPortRange(Configuration configuration) {

Review comment:
       If we overwrite the `HA_JOB_MANAGER_PORT_RANGE` and 
`JobManagerOptions.PORT` to 0  in `KubernetesEntrypointUtils#loadConfiguration` 
when host network enabled, then we do not need this change here.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
##########
@@ -132,9 +133,11 @@ private Pod decoratePod(Pod pod) {
         for (File file : localLogFiles) {
             data.put(file.getName(), Files.toString(file, 
StandardCharsets.UTF_8));
         }
-
-        final Map<String, String> propertiesMap =
-                
getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration());
+        Configuration configuration = 
kubernetesComponentConf.getFlinkConfiguration();

Review comment:
       Just like what I said above, updating the port could be moved in JM side.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java
##########
@@ -93,6 +94,12 @@ private static Deployment createJobManagerDeployment(
         final Pod resolvedPod =
                 new PodBuilder(flinkPod.getPodWithoutMainContainer())
                         .editOrNewSpec()
+                        .withHostNetwork(

Review comment:
       I do not think we need to set the host network here.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -86,6 +93,44 @@
 
     private static final YAMLMapper yamlMapper = new YAMLMapper();

Review comment:
       After all other comments are addressed, I believe we do not need so many 
inessential changes in `KubernetesUtils.java`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -201,6 +201,23 @@ KubernetesLeaderElector createLeaderElector(
      */
     KubernetesPod loadPodFromTemplateFile(File podTemplateFile);
 
+    /**
+     * Update InternalService targetPort to real value.
+     *
+     * @param jobManagerRpcPort Real jobManagerRpcPort
+     * @param blobServerPort Real blobServerPort
+     * @return Return a modified InternalService that can be used correctly
+     */
+    KubernetesService updateInternalServicePort(int jobManagerRpcPort, int 
blobServerPort);

Review comment:
       I believe we could have a unified interface `CompletableFuture<Void> 
updateServicePort(String svcName, String portName, int targetPort)`. Then the 
implementation could also be simplified.
   
   ```
       public void updateServicePort(String svcName, String portName, String 
targetPort) {
          ... ...
           getService(clusterId)
                   .ifPresent(
                           service -> {
                               final Service updatedService =
                                       new 
ServiceBuilder(service.getInternalResource())
                                               .editSpec()
                                               .editMatchingPort(
                                                       servicePortBuilder ->
                                                               
servicePortBuilder
                                                                       .build()
                                                                       
.getName()
                                                                       
.equals(portName))
                                               .endPort()
                                               .endSpec()
                                               .build();
                               this.internalClient
                                       .services()
                                       .withName(svcName)
                                       .replace(updatedService);
                           });
          ...  ...
   }
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
##########
@@ -137,17 +145,30 @@ private Container decorateMainContainer(Container 
container) {
                 .withResources(resourceRequirements);
 
         // Merge fields
-        mainContainerBuilder
-                .addToPorts(
-                        new ContainerPortBuilder()
-                                .withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
-                                
.withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
-                                .build())
-                .addAllToEnv(getCustomizedEnvs());
-
+        
mainContainerPortBuilder(mainContainerBuilder).addAllToEnv(getCustomizedEnvs());
         return mainContainerBuilder.build();
     }
 
+    private ContainerBuilder mainContainerPortBuilder(ContainerBuilder 
builder) {
+        ContainerPort containerPort = getContainerPorts();
+        if (null != containerPort) {
+            builder.addToPorts(containerPort);
+        }
+        return builder;
+    }
+
+    private ContainerPort getContainerPorts() {
+        if (kubernetesTaskManagerParameters
+                .getFlinkConfiguration()
+                
.getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED)) {
+            return null;
+        }
+        return new ContainerPortBuilder()
+                .withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
+                
.withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
+                .build();
+    }
+

Review comment:
       ```suggestion
   
       private List<ContainerPort> getContainerPorts() {
           if (kubernetesTaskManagerParameters.isHostNetworkEnabled()) {
               return Collections.emptyList();
           }
           return Collections.singletonList(
                   new ContainerPortBuilder()
                           .withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
                           
.withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
                           .build());
       }
   
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
##########
@@ -25,6 +25,9 @@
     public static final String API_VERSION = "v1";
     public static final String APPS_API_VERSION = "apps/v1";
 
+    public static final String DNS_PLOICY_DEFAULT = "ClusterFirst";

Review comment:
       I am not sure the default value of `dnsPolicy` is `Default` or 
`ClusterFirst`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -452,6 +452,14 @@
                                     + "It will help to achieve faster 
recovery. "
                                     + "Notice that high availability should be 
enabled when starting standby JobManagers.");
 
+    public static final ConfigOption<Boolean> KUBERNETES_HOSTNETWORK_ENABLED =
+            key("kubernetes.hostnetwork.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable HostNetwork mode."
+                                    + "When enabled, the network access 
between pods will directly use the host network instead of the CNI plug-in");

Review comment:
       ```suggestion
                       .withDescription(
                               "Whether to enable HostNetwork mode. "
                                       + "The HostNetwork allows the pod could 
use the node network namespace instead of the individual pod network namespace. 
");
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##########
@@ -240,20 +237,11 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
 
         
flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, 
entryPoint);
 
-        // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update 
them to fixed values.
-        KubernetesUtils.checkAndUpdatePortConfigOption(
-                flinkConfig, BlobServerOptions.PORT, 
Constants.BLOB_SERVER_PORT);
-        KubernetesUtils.checkAndUpdatePortConfigOption(
-                flinkConfig, TaskManagerOptions.RPC_PORT, 
Constants.TASK_MANAGER_RPC_PORT);
-        KubernetesUtils.checkAndUpdatePortConfigOption(
-                flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
+        KubernetesUtils.updatePorts(flinkConfig);
 
         if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) 
{
             flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, 
clusterId);
-            KubernetesUtils.checkAndUpdatePortConfigOption(
-                    flinkConfig,
-                    HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,
-                    flinkConfig.get(JobManagerOptions.PORT));
+            KubernetesUtils.updateHighAvailabilityPort(flinkConfig);

Review comment:
       I think we could do the port overridden in 
`KubernetesEntrypointUtils#loadConfiguration` just like what we have do in the 
`YarnEntrypointUtils#loadConfiguration`.
   
   Then the `KubernetesClusterDescriptor` will keep the same behavior with 
before.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
##########
@@ -153,6 +161,11 @@ private Container decorateMainContainer(Container 
container) {
     }
 
     private List<ContainerPort> getContainerPorts() {
+        if (kubernetesJobManagerParameters
+                .getFlinkConfiguration()
+                
.getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED)) {
+            return Lists.newArrayList();
+        }

Review comment:
       ```suggestion
           if (kubernetesJobManagerParameters.isHostNetworkEnabled()) {
               return Collections.emptyList();
           }
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java
##########
@@ -59,6 +60,12 @@ public static KubernetesPod buildTaskManagerKubernetesPod(
         final Pod resolvedPod =
                 new PodBuilder(flinkPod.getPodWithoutMainContainer())
                         .editOrNewSpec()
+                        .withHostNetwork(

Review comment:
       Same as above.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypointTest.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link KubernetesSessionClusterEntrypoint}. */
+public class KubernetesSessionClusterEntrypointTest extends TestLogger {
+
+    @Test
+    public void testGetRPCPortRange() {

Review comment:
       Unnecessary test after addressing the above comments.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -250,6 +249,55 @@ public void testServiceLoadBalancerNullHostAndIP() {
         assertThat(resultEndpoint.isPresent(), is(false));
     }
 
+    @Test
+    public void testUpdateInternalServicePort() {

Review comment:
       We do not need to mock the internal/external service. I think the test 
could be verified in the following steps.
   * Use `flinkKubeClient.createJobManagerComponent()` to create the 
accompanying resources, including the services
   * Verify the service existence
   * Update the service target port
   * Verify the updated service




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to