gyfora commented on code in PR #712:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/712#discussion_r1416192622


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -1075,4 +1106,59 @@ protected void 
updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
             status.getJobStatus().setState(JobStatus.FINISHED.name());
         }
     }
+
+    private Configuration getOperatorRestConfig(Configuration origConfig) 
throws IOException {
+        Configuration conf = new Configuration(origConfig);
+        EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)
+                .ifPresent(
+                        path -> {
+                            if (Files.notExists(Paths.get(path))) {
+                                return;
+                            }
+                            conf.set(
+                                    SecurityOptions.SSL_REST_TRUSTSTORE,
+                                    
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH));
+                            conf.set(
+                                    
SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD,
+                                    
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                            if 
(SecurityOptions.isRestSSLAuthenticationEnabled(conf)
+                                    && 
EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)
+                                            .isPresent()) {
+                                conf.set(
+                                        SecurityOptions.SSL_REST_KEYSTORE,
+                                        
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH));
+                                conf.set(
+                                        
SecurityOptions.SSL_REST_KEYSTORE_PASSWORD,
+                                        EnvUtils.getRequired(
+                                                
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                                conf.set(
+                                        SecurityOptions.SSL_REST_KEY_PASSWORD,
+                                        EnvUtils.getRequired(
+                                                
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                            } else {
+                                
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE);
+                                
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD);
+                            }
+                            conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE);
+                            
conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+                            conf.removeConfig(SecurityOptions.SSL_KEYSTORE);
+                            
conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+                        });
+        return conf;
+    }
+
+    private boolean isValidRuntimeException(Configuration conf, 
RuntimeException e) {
+        final Optional<String> trustStorePath = 
EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH);
+        // The ClusterDescriptors always try and create a RestClient from the 
config
+        // that would be given to the deployment. When SSL is enabled it will 
throw
+        // a ClusterRetrieveException as the operator does not have the certs 
where they
+        // would be mounted on the client
+        if (SecurityOptions.isRestSSLEnabled(conf)
+                && trustStorePath.isPresent()
+                && Files.exists(Paths.get(trustStorePath.get()))
+                && e.getCause() instanceof ClusterRetrieveException) {
+            return true;
+        }
+        return false;

Review Comment:
   It’s normal to get exceptions on invalid configs . It is the expected 
behavior , we have handling for these errors I believe 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##########
@@ -1075,4 +1106,59 @@ protected void 
updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) {
             status.getJobStatus().setState(JobStatus.FINISHED.name());
         }
     }
+
+    private Configuration getOperatorRestConfig(Configuration origConfig) 
throws IOException {
+        Configuration conf = new Configuration(origConfig);
+        EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)
+                .ifPresent(
+                        path -> {
+                            if (Files.notExists(Paths.get(path))) {
+                                return;
+                            }
+                            conf.set(
+                                    SecurityOptions.SSL_REST_TRUSTSTORE,
+                                    
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH));
+                            conf.set(
+                                    
SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD,
+                                    
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                            if 
(SecurityOptions.isRestSSLAuthenticationEnabled(conf)
+                                    && 
EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)
+                                            .isPresent()) {
+                                conf.set(
+                                        SecurityOptions.SSL_REST_KEYSTORE,
+                                        
EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH));
+                                conf.set(
+                                        
SecurityOptions.SSL_REST_KEYSTORE_PASSWORD,
+                                        EnvUtils.getRequired(
+                                                
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                                conf.set(
+                                        SecurityOptions.SSL_REST_KEY_PASSWORD,
+                                        EnvUtils.getRequired(
+                                                
EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD));
+                            } else {
+                                
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE);
+                                
conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD);
+                            }
+                            conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE);
+                            
conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD);
+                            conf.removeConfig(SecurityOptions.SSL_KEYSTORE);
+                            
conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD);
+                        });
+        return conf;
+    }
+
+    private boolean isValidRuntimeException(Configuration conf, 
RuntimeException e) {
+        final Optional<String> trustStorePath = 
EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH);
+        // The ClusterDescriptors always try and create a RestClient from the 
config
+        // that would be given to the deployment. When SSL is enabled it will 
throw
+        // a ClusterRetrieveException as the operator does not have the certs 
where they
+        // would be mounted on the client
+        if (SecurityOptions.isRestSSLEnabled(conf)
+                && trustStorePath.isPresent()
+                && Files.exists(Paths.get(trustStorePath.get()))
+                && e.getCause() instanceof ClusterRetrieveException) {
+            return true;
+        }
+        return false;

Review Comment:
   It’s normal to get exceptions on invalid configs . It is the expected 
behavior , we have handling for these errors I believe 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to