wangyang0918 commented on a change in pull request #18119: URL: https://github.com/apache/flink/pull/18119#discussion_r772792426
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java ########## @@ -36,13 +42,48 @@ public KubernetesSessionClusterEntrypoint(Configuration configuration) { super(configuration); } + private int blobPort; + @Override protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory( KubernetesResourceManagerFactory.getInstance()); } + @Override + protected void updateBlobPort(int port) { Review comment: Adding the interface `updateBlobPort()` `additional()` in `ClusterEntrypoint` and doing the implementation here are really not a good proposal. If we want to update the K8s internal/external services, then it could happen in `KubernetesResourceManagerDriver#initializeInternal()`. BTW, not only the session mode, we also need to support host network for application mode. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java ########## @@ -36,13 +42,48 @@ public KubernetesSessionClusterEntrypoint(Configuration configuration) { super(configuration); } + private int blobPort; + @Override protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory( KubernetesResourceManagerFactory.getInstance()); } + @Override + protected void updateBlobPort(int port) { + this.blobPort = port; + } + + @Override + protected String getRPCPortRange(Configuration configuration) { Review comment: If we overwrite the `HA_JOB_MANAGER_PORT_RANGE` and `JobManagerOptions.PORT` to 0 in `KubernetesEntrypointUtils#loadConfiguration` when host network enabled, then we do not need this change here. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java ########## @@ -132,9 +133,11 @@ private Pod decoratePod(Pod pod) { for (File file : localLogFiles) { data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); } - - final Map<String, String> propertiesMap = - getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration()); + Configuration configuration = kubernetesComponentConf.getFlinkConfiguration(); Review comment: Just like what I said above, updating the port could be moved in JM side. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java ########## @@ -93,6 +94,12 @@ private static Deployment createJobManagerDeployment( final Pod resolvedPod = new PodBuilder(flinkPod.getPodWithoutMainContainer()) .editOrNewSpec() + .withHostNetwork( Review comment: I do not think we need to set the host network here. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java ########## @@ -86,6 +93,44 @@ private static final YAMLMapper yamlMapper = new YAMLMapper(); Review comment: After all other comments are addressed, I believe we do not need so many inessential changes in `KubernetesUtils.java`. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java ########## @@ -201,6 +201,23 @@ KubernetesLeaderElector createLeaderElector( */ KubernetesPod loadPodFromTemplateFile(File podTemplateFile); + /** + * Update InternalService targetPort to real value. + * + * @param jobManagerRpcPort Real jobManagerRpcPort + * @param blobServerPort Real blobServerPort + * @return Return a modified InternalService that can be used correctly + */ + KubernetesService updateInternalServicePort(int jobManagerRpcPort, int blobServerPort); Review comment: I believe we could have a unified interface `CompletableFuture<Void> updateServicePort(String svcName, String portName, int targetPort)`. Then the implementation could also be simplified. ``` public void updateServicePort(String svcName, String portName, String targetPort) { ... ... getService(clusterId) .ifPresent( service -> { final Service updatedService = new ServiceBuilder(service.getInternalResource()) .editSpec() .editMatchingPort( servicePortBuilder -> servicePortBuilder .build() .getName() .equals(portName)) .endPort() .endSpec() .build(); this.internalClient .services() .withName(svcName) .replace(updatedService); }); ... ... } ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java ########## @@ -137,17 +145,30 @@ private Container decorateMainContainer(Container container) { .withResources(resourceRequirements); // Merge fields - mainContainerBuilder - .addToPorts( - new ContainerPortBuilder() - .withName(Constants.TASK_MANAGER_RPC_PORT_NAME) - .withContainerPort(kubernetesTaskManagerParameters.getRPCPort()) - .build()) - .addAllToEnv(getCustomizedEnvs()); - + mainContainerPortBuilder(mainContainerBuilder).addAllToEnv(getCustomizedEnvs()); return mainContainerBuilder.build(); } + private ContainerBuilder mainContainerPortBuilder(ContainerBuilder builder) { + ContainerPort containerPort = getContainerPorts(); + if (null != containerPort) { + builder.addToPorts(containerPort); + } + return builder; + } + + private ContainerPort getContainerPorts() { + if (kubernetesTaskManagerParameters + .getFlinkConfiguration() + .getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED)) { + return null; + } + return new ContainerPortBuilder() + .withName(Constants.TASK_MANAGER_RPC_PORT_NAME) + .withContainerPort(kubernetesTaskManagerParameters.getRPCPort()) + .build(); + } + Review comment: ```suggestion private List<ContainerPort> getContainerPorts() { if (kubernetesTaskManagerParameters.isHostNetworkEnabled()) { return Collections.emptyList(); } return Collections.singletonList( new ContainerPortBuilder() .withName(Constants.TASK_MANAGER_RPC_PORT_NAME) .withContainerPort(kubernetesTaskManagerParameters.getRPCPort()) .build()); } ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java ########## @@ -25,6 +25,9 @@ public static final String API_VERSION = "v1"; public static final String APPS_API_VERSION = "apps/v1"; + public static final String DNS_PLOICY_DEFAULT = "ClusterFirst"; Review comment: I am not sure the default value of `dnsPolicy` is `Default` or `ClusterFirst`. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java ########## @@ -452,6 +452,14 @@ + "It will help to achieve faster recovery. " + "Notice that high availability should be enabled when starting standby JobManagers."); + public static final ConfigOption<Boolean> KUBERNETES_HOSTNETWORK_ENABLED = + key("kubernetes.hostnetwork.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable HostNetwork mode." + + "When enabled, the network access between pods will directly use the host network instead of the CNI plug-in"); Review comment: ```suggestion .withDescription( "Whether to enable HostNetwork mode. " + "The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. "); ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ########## @@ -240,20 +237,11 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); - // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. - KubernetesUtils.checkAndUpdatePortConfigOption( - flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); - KubernetesUtils.checkAndUpdatePortConfigOption( - flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); - KubernetesUtils.checkAndUpdatePortConfigOption( - flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); + KubernetesUtils.updatePorts(flinkConfig); if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); - KubernetesUtils.checkAndUpdatePortConfigOption( - flinkConfig, - HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, - flinkConfig.get(JobManagerOptions.PORT)); + KubernetesUtils.updateHighAvailabilityPort(flinkConfig); Review comment: I think we could do the port overridden in `KubernetesEntrypointUtils#loadConfiguration` just like what we have do in the `YarnEntrypointUtils#loadConfiguration`. Then the `KubernetesClusterDescriptor` will keep the same behavior with before. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java ########## @@ -153,6 +161,11 @@ private Container decorateMainContainer(Container container) { } private List<ContainerPort> getContainerPorts() { + if (kubernetesJobManagerParameters + .getFlinkConfiguration() + .getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED)) { + return Lists.newArrayList(); + } Review comment: ```suggestion if (kubernetesJobManagerParameters.isHostNetworkEnabled()) { return Collections.emptyList(); } ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesTaskManagerFactory.java ########## @@ -59,6 +60,12 @@ public static KubernetesPod buildTaskManagerKubernetesPod( final Pod resolvedPod = new PodBuilder(flinkPod.getPodWithoutMainContainer()) .editOrNewSpec() + .withHostNetwork( Review comment: Same as above. ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypointTest.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link KubernetesSessionClusterEntrypoint}. */ +public class KubernetesSessionClusterEntrypointTest extends TestLogger { + + @Test + public void testGetRPCPortRange() { Review comment: Unnecessary test after addressing the above comments. ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java ########## @@ -250,6 +249,55 @@ public void testServiceLoadBalancerNullHostAndIP() { assertThat(resultEndpoint.isPresent(), is(false)); } + @Test + public void testUpdateInternalServicePort() { Review comment: We do not need to mock the internal/external service. I think the test could be verified in the following steps. * Use `flinkKubeClient.createJobManagerComponent()` to create the accompanying resources, including the services * Verify the service existence * Update the service target port * Verify the updated service -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org