gyfora commented on code in PR #712: URL: https://github.com/apache/flink-kubernetes-operator/pull/712#discussion_r1416192622
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ########## @@ -1075,4 +1106,59 @@ protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) { status.getJobStatus().setState(JobStatus.FINISHED.name()); } } + + private Configuration getOperatorRestConfig(Configuration origConfig) throws IOException { + Configuration conf = new Configuration(origConfig); + EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH) + .ifPresent( + path -> { + if (Files.notExists(Paths.get(path))) { + return; + } + conf.set( + SecurityOptions.SSL_REST_TRUSTSTORE, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)); + conf.set( + SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + if (SecurityOptions.isRestSSLAuthenticationEnabled(conf) + && EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH) + .isPresent()) { + conf.set( + SecurityOptions.SSL_REST_KEYSTORE, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)); + conf.set( + SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, + EnvUtils.getRequired( + EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + conf.set( + SecurityOptions.SSL_REST_KEY_PASSWORD, + EnvUtils.getRequired( + EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + } else { + conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE); + conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD); + } + conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE); + conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); + conf.removeConfig(SecurityOptions.SSL_KEYSTORE); + conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD); + }); + return conf; + } + + private boolean isValidRuntimeException(Configuration conf, RuntimeException e) { + final Optional<String> trustStorePath = EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH); + // The ClusterDescriptors always try and create a RestClient from the config + // that would be given to the deployment. When SSL is enabled it will throw + // a ClusterRetrieveException as the operator does not have the certs where they + // would be mounted on the client + if (SecurityOptions.isRestSSLEnabled(conf) + && trustStorePath.isPresent() + && Files.exists(Paths.get(trustStorePath.get())) + && e.getCause() instanceof ClusterRetrieveException) { + return true; + } + return false; Review Comment: It’s normal to get exceptions on invalid configs . It is the expected behavior , we have handling for these errors I believe ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ########## @@ -1075,4 +1106,59 @@ protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus status) { status.getJobStatus().setState(JobStatus.FINISHED.name()); } } + + private Configuration getOperatorRestConfig(Configuration origConfig) throws IOException { + Configuration conf = new Configuration(origConfig); + EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH) + .ifPresent( + path -> { + if (Files.notExists(Paths.get(path))) { + return; + } + conf.set( + SecurityOptions.SSL_REST_TRUSTSTORE, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH)); + conf.set( + SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + if (SecurityOptions.isRestSSLAuthenticationEnabled(conf) + && EnvUtils.get(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH) + .isPresent()) { + conf.set( + SecurityOptions.SSL_REST_KEYSTORE, + EnvUtils.getRequired(EnvUtils.ENV_OPERATOR_KEYSTORE_PATH)); + conf.set( + SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, + EnvUtils.getRequired( + EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + conf.set( + SecurityOptions.SSL_REST_KEY_PASSWORD, + EnvUtils.getRequired( + EnvUtils.ENV_OPERATOR_KEYSTORE_PASSWORD)); + } else { + conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE); + conf.removeConfig(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD); + } + conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE); + conf.removeConfig(SecurityOptions.SSL_TRUSTSTORE_PASSWORD); + conf.removeConfig(SecurityOptions.SSL_KEYSTORE); + conf.removeConfig(SecurityOptions.SSL_KEYSTORE_PASSWORD); + }); + return conf; + } + + private boolean isValidRuntimeException(Configuration conf, RuntimeException e) { + final Optional<String> trustStorePath = EnvUtils.get(EnvUtils.ENV_OPERATOR_TRUSTSTORE_PATH); + // The ClusterDescriptors always try and create a RestClient from the config + // that would be given to the deployment. When SSL is enabled it will throw + // a ClusterRetrieveException as the operator does not have the certs where they + // would be mounted on the client + if (SecurityOptions.isRestSSLEnabled(conf) + && trustStorePath.isPresent() + && Files.exists(Paths.get(trustStorePath.get())) + && e.getCause() instanceof ClusterRetrieveException) { + return true; + } + return false; Review Comment: It’s normal to get exceptions on invalid configs . It is the expected behavior , we have handling for these errors I believe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org