XComp commented on a change in pull request #15131:
URL: https://github.com/apache/flink/pull/15131#discussion_r607228165



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
##########
@@ -108,6 +108,19 @@
                                     + " (for example, `Client,KafkaClient` to 
use the credentials for ZooKeeper authentication and for"
                                     + " Kafka authentication)");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
+            key("security.kerberos.fetch.delegation-token")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Indicates whether to fetch delegation token for 
external services the Flink job needs to contact. "
+                                    + "Only HDFS and HBase are supported, and 
only works for flink-yarn at the moment. "
+                                    + "If true, Flink will fetch HDFS/HBase 
delegation tokens and inject into Yarn AM container. "
+                                    + "If false, Flink will assume that the 
job has delegation tokens and will not fetch them. "
+                                    + "This applies to submission mechanisms 
like Oozie, which will obtain delegation tokens "

Review comment:
       ```suggestion
                               "Indicates whether to fetch the delegation token 
for external services the Flink job needs to contact. "
                                       + "Only HDFS and HBase are supported. It 
is used in YARN deployments. "
                                       + "If true, Flink will fetch HDFS and 
HBase delegation tokens and inject them into Yarn AM container. "
                                       + "If false, Flink will assume that the 
delegation tokens are managed outside of Flink. As a consequence, it will not 
fetch delegation tokens for HDFS and HBase."
                                       + "This applies to submission mechanisms 
like Apache Oozie, which will obtain delegation tokens "
   ```
   Here's a proposal to make the docs more explicit.

##########
File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
##########
@@ -197,21 +197,31 @@ private static LocalResource registerLocalResource(
     }
 
     public static void setTokensFor(
-            ContainerLaunchContext amContainer, List<Path> paths, 
Configuration conf)
+            ContainerLaunchContext amContainer,
+            List<Path> paths,
+            Configuration conf,
+            boolean obtainingDelegationTokens)
             throws IOException {
         Credentials credentials = new Credentials();
-        // for HDFS
-        TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new 
Path[0]), conf);
-        // for HBase
-        obtainTokenForHBase(credentials, conf);
+
+        if (obtainingDelegationTokens) {

Review comment:
       Would it make sense to have separate parameters for each of the services 
similar to how Spark is implementing this feature? That's just an idea I came 
up with after browsing the Spark sources where it's separated like that.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -1079,15 +1091,24 @@ private ApplicationReport startAppMaster(
 
         // setup security tokens
         if (UserGroupInformation.isSecurityEnabled()) {
-            // set HDFS delegation tokens when security is enabled
-            LOG.info("Adding delegation token to the AM container.");
-            List<Path> yarnAccessList =
-                    ConfigUtils.decodeListFromConfig(
-                            configuration, YarnConfigOptions.YARN_ACCESS, 
Path::new);
+            List<Path> yarnAccessList = new ArrayList<>();
+
+            Boolean kerberosFetchDelegationTokenEnabled =
+                    
configuration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
+
+            if (kerberosFetchDelegationTokenEnabled) {
+                // set HDFS delegation tokens when security is enabled
+                LOG.info("Adding delegation token to the AM container.");
+                yarnAccessList =
+                        ConfigUtils.decodeListFromConfig(
+                                configuration, YarnConfigOptions.YARN_ACCESS, 
Path::new);
+            }
+
             Utils.setTokensFor(
                     amContainer,
                     ListUtils.union(yarnAccessList, 
fileUploader.getRemotePaths()),
-                    yarnConfiguration);
+                    yarnConfiguration,
+                    kerberosFetchDelegationTokenEnabled);

Review comment:
       I'm wondering whether this change is actually necessary? Is there a 
specific reason for it? Aren't we implicitly ignoring the `YARN_ACCESS` setting 
in `Utils.setTokensFor`? Or am I missing something?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -529,6 +530,17 @@ public void killCluster(ApplicationId applicationId) 
throws FlinkException {
                         "Hadoop security with Kerberos is enabled but the 
login user "
                                 + "does not have Kerberos credentials or 
delegation tokens!");
             }
+
+            boolean kerberosFetchDTEnabled =
+                    
flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_FETCH_DELEGATION_TOKEN);
+            boolean yarnAccessFSEnabled =
+                    flinkConfiguration.get(YarnConfigOptions.YARN_ACCESS) != 
null;
+            if (!kerberosFetchDTEnabled && yarnAccessFSEnabled) {
+                throw new RuntimeException(

Review comment:
       Wouldn't be a warning sufficient enough? The `YARN_ACCESS` setting does 
not harm the system. It would be just ignored if delegation tokens are disabled.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
##########
@@ -108,6 +108,19 @@
                                     + " (for example, `Client,KafkaClient` to 
use the credentials for ZooKeeper authentication and for"
                                     + " Kafka authentication)");
 
+    @Documentation.Section(Documentation.Sections.SECURITY_AUTH_KERBEROS)
+    public static final ConfigOption<Boolean> KERBEROS_FETCH_DELEGATION_TOKEN =
+            key("security.kerberos.fetch.delegation-token")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Indicates whether to fetch delegation token for 
external services the Flink job needs to contact. "
+                                    + "Only HDFS and HBase are supported, and 
only works for flink-yarn at the moment. "
+                                    + "If true, Flink will fetch HDFS/HBase 
delegation tokens and inject into Yarn AM container. "
+                                    + "If false, Flink will assume that the 
job has delegation tokens and will not fetch them. "
+                                    + "This applies to submission mechanisms 
like Oozie, which will obtain delegation tokens "

Review comment:
       AFAIU, Oozie schedules Flink job submissions and utilizes [Hadoop's 
ProxyUser 
feature](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html)
 to obtain an delegation token to access the components (in our case HDFS and 
Hbase). As far as I understand it, the delegation token request usually 
triggered by Flink would fail if Apache Oozie handles the delegation tokens as 
it uses it's own Proxy User that impersonates the actual users.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to