XComp commented on a change in pull request #15131: URL: https://github.com/apache/flink/pull/15131#discussion_r607228165
########## File path: flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java ########## @@ -108,6 +108,19 @@ + " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for" + " Kafka authentication)"); + @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS) + public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN = + key("security.kerberos.fetch.delegation-token") + .booleanType() + .defaultValue(true) + .withDescription( + "Indicates whether to fetch delegation token for external services the Flink job needs to contact. " + + "Only HDFS and HBase are supported, and only works for flink-yarn at the moment. " + + "If true, Flink will fetch HDFS/HBase delegation tokens and inject into Yarn AM container. " + + "If false, Flink will assume that the job has delegation tokens and will not fetch them. " + + "This applies to submission mechanisms like Oozie, which will obtain delegation tokens " Review comment: ```suggestion "Indicates whether to fetch the delegation token for external services the Flink job needs to contact. " + "Only HDFS and HBase are supported. It is used in YARN deployments. " + "If true, Flink will fetch HDFS and HBase delegation tokens and inject them into Yarn AM container. " + "If false, Flink will assume that the delegation tokens are managed outside of Flink. As a consequence, it will not fetch delegation tokens for HDFS and HBase." + "This applies to submission mechanisms like Apache Oozie, which will obtain delegation tokens " ``` Here's a proposal to make the docs more explicit. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ########## @@ -197,21 +197,31 @@ private static LocalResource registerLocalResource( } public static void setTokensFor( - ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) + ContainerLaunchContext amContainer, + List<Path> paths, + Configuration conf, + boolean obtainingDelegationTokens) throws IOException { Credentials credentials = new Credentials(); - // for HDFS - TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf); - // for HBase - obtainTokenForHBase(credentials, conf); + + if (obtainingDelegationTokens) { Review comment: Would it make sense to have separate parameters for each of the services similar to how Spark is implementing this feature? That's just an idea I came up with after browsing the Spark sources where it's separated like that. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ########## @@ -1079,15 +1091,24 @@ private ApplicationReport startAppMaster( // setup security tokens if (UserGroupInformation.isSecurityEnabled()) { - // set HDFS delegation tokens when security is enabled - LOG.info("Adding delegation token to the AM container."); - List<Path> yarnAccessList = - ConfigUtils.decodeListFromConfig( - configuration, YarnConfigOptions.YARN_ACCESS, Path::new); + List<Path> yarnAccessList = new ArrayList<>(); + + Boolean kerberosFetchDelegationTokenEnabled = + configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN); + + if (kerberosFetchDelegationTokenEnabled) { + // set HDFS delegation tokens when security is enabled + LOG.info("Adding delegation token to the AM container."); + yarnAccessList = + ConfigUtils.decodeListFromConfig( + configuration, YarnConfigOptions.YARN_ACCESS, Path::new); + } + Utils.setTokensFor( amContainer, ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()), - yarnConfiguration); + yarnConfiguration, + kerberosFetchDelegationTokenEnabled); Review comment: I'm wondering whether this change is actually necessary? Is there a specific reason for it? Aren't we implicitly ignoring the `YARN_ACCESS` setting in `Utils.setTokensFor`? Or am I missing something? ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ########## @@ -529,6 +530,17 @@ public void killCluster(ApplicationId applicationId) throws FlinkException { "Hadoop security with Kerberos is enabled but the login user " + "does not have Kerberos credentials or delegation tokens!"); } + + boolean kerberosFetchDTEnabled = + flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN); + boolean yarnAccessFSEnabled = + flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS) != null; + if (!kerberosFetchDTEnabled && yarnAccessFSEnabled) { + throw new RuntimeException( Review comment: Wouldn't be a warning sufficient enough? The `YARN_ACCESS` setting does not harm the system. It would be just ignored if delegation tokens are disabled. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java ########## @@ -108,6 +108,19 @@ + " (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for" + " Kafka authentication)"); + @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS) + public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN = + key("security.kerberos.fetch.delegation-token") + .booleanType() + .defaultValue(true) + .withDescription( + "Indicates whether to fetch delegation token for external services the Flink job needs to contact. " + + "Only HDFS and HBase are supported, and only works for flink-yarn at the moment. " + + "If true, Flink will fetch HDFS/HBase delegation tokens and inject into Yarn AM container. " + + "If false, Flink will assume that the job has delegation tokens and will not fetch them. " + + "This applies to submission mechanisms like Oozie, which will obtain delegation tokens " Review comment: AFAIU, Oozie schedules Flink job submissions and utilizes [Hadoop's ProxyUser feature](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html) to obtain an delegation token to access the components (in our case HDFS and Hbase). As far as I understand it, the delegation token request usually triggered by Flink would fail if Apache Oozie handles the delegation tokens as it uses it's own Proxy User that impersonates the actual users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org