zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] 
Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409334611
 
 

 ##########
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, 
List<HasMetadata> resource
        }
 
        /**
-        * To get nodePort of configured ports.
+        * Get rest port from the external Service.
         */
-       private int getServiceNodePort(Service service, ConfigOption<Integer> 
configPort) {
-               final int port = this.flinkConfig.getInteger(configPort);
-               if (service.getSpec() != null && service.getSpec().getPorts() 
!= null) {
-                       for (ServicePort p : service.getSpec().getPorts()) {
-                               if (p.getPort() == port) {
-                                       return p.getNodePort();
-                               }
-                       }
+       private int getRestPortFromExternalService(Service externalService) {
+               final List<ServicePort> servicePortCandidates = 
externalService.getSpec().getPorts()
+                       .stream()
+                       .filter(x -> 
x.getName().equals(Constants.REST_PORT_NAME))
+                       .collect(Collectors.toList());
+
+               if (servicePortCandidates.isEmpty()) {
+                       throw new RuntimeException("Failed to find port \"" + 
Constants.REST_PORT_NAME + "\" in Service \"" +
+                               
KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+               }
+
+               final ServicePort externalServicePort = 
servicePortCandidates.get(0);
+
+               final KubernetesConfigOptions.ServiceExposedType 
externalServiceType =
+                       
KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+               switch (externalServiceType) {
+                       case ClusterIP:
+                       case LoadBalancer:
 
 Review comment:
   > However, in such case, we should wait for the load balancer ready and 
return the `EXTERNAL-IP:8081` as JobManager url. Otherwise, the Flink client 
will timeout and should clean-up the cluster resources.
   
   Currently, we clean up the K8s resources once errors happen during the 
creation of the `Deployment`, `ConfigMap`, and `Service`; we throw an Exception 
instead of cleaning up all the resources if we failed to retrieve the Endpoint, 
which I think is reasonable since the external Service is a bypass to some 
extent.
   
   Regarding whether the `Service` is ready, previously I left a question at 
https://github.com/apache/flink/pull/11233, I will file another ticket for 
further discussion and fixup.
   
   > Then for unmanaged K8s cluster without load balancer, we will enforce the 
users to set kubernetes.rest-service.exposed.type=NodePort explicitly, rather 
that return a confusing JobManager url MASTER_ADDRESS:8081. Also the submission 
will always fail with timeout.
   
   Sorry that I don't understand the core problem here. It would be nice if you 
provide more information.
   
   Lastly, I was planning to rework the implementation of 
`Fabric8FlinkKubeClient#getRestEndpoint` to clearly separating the 
functionality of `NodePort` and `LB` Service, meanwhile fix some bugs in that 
method. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to