lirui-apache commented on a change in pull request #14841:
URL: https://github.com/apache/flink/pull/14841#discussion_r571603548



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProvider.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Delegation token provider implementation for Hadoop FileSystems. */
+public class HadoopFSDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class);
+
+    @Override
+    public String serviceName() {
+        return "hadoopfs";
+    }
+
+    @Override
+    public boolean delegationTokensRequired(
+            Configuration flinkConf, org.apache.hadoop.conf.Configuration 
hadoopConf) {
+        return UserGroupInformation.isSecurityEnabled();
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(
+            Configuration flinkConf,
+            org.apache.hadoop.conf.Configuration hadoopConf,
+            Credentials credentials) {
+        try {
+            Set<FileSystem> fileSystemsToAccess = 
getFileSystemsToAccess(flinkConf, hadoopConf);
+
+            final String renewer = getTokenRenewer(hadoopConf);
+            fileSystemsToAccess.forEach(
+                    fs -> {
+                        try {
+                            LOG.info("Getting FS token for: {} with renewer 
{}", fs, renewer);
+                            fs.addDelegationTokens(renewer, credentials);
+                        } catch (IOException e) {
+                            LOG.warn("Failed to get token for {}.", fs);
+                        }
+                    });
+        } catch (IOException e) {
+            LOG.error("Failed to obtain tokens for Hadoop FileSystems: {}", 
e.getMessage());

Review comment:
       Shouldn't we error out if DT can't be obtained? I suppose the job would 
fail in that case. So it's better to fail fast.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HBaseDelegationTokenProvider.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    private org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public boolean delegationTokensRequired(
+            Configuration flinkConf, org.apache.hadoop.conf.Configuration 
hadoopConf) {
+        if (UserGroupInformation.isSecurityEnabled()) {
+            hbaseConf = createHBaseConfiguration(hadoopConf);
+            LOG.debug("HBase security setting: {}", 
hbaseConf.get("hbase.security.authentication"));

Review comment:
       If this method is not frequently called, let's not change the log level 
here. Users may rely on the previous INFO level logs.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopDelegationTokenProvider.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.security.Credentials;
+
+import java.util.Optional;
+
+/** Hadoop delegation token provider. */
+public interface HadoopDelegationTokenProvider {
+
+    /** Name of the service to provide delegation tokens. This name should be 
unique. */
+    String serviceName();
+
+    /**
+     * Return true if delegation tokens are required for this service.
+     *
+     * @param flinkConf Flink configuration
+     * @param hadoopConf Hadoop configuration
+     * @return true if delegation tokens are required
+     */
+    boolean delegationTokensRequired(

Review comment:
       Both `delegationTokensRequired` and `obtainDelegationTokens` take some 
configuration parameters. I wonder what's the relationship between these 
configurations? Should `obtainDelegationTokens` check whether DT is required 
before actually get the DT? If the configurations are identical, maybe we can 
have an `init` method to configure the provider and other methods won't accept 
configurations parameters?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProvider.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Delegation token provider implementation for Hadoop FileSystems. */
+public class HadoopFSDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class);
+
+    @Override
+    public String serviceName() {
+        return "hadoopfs";
+    }
+
+    @Override
+    public boolean delegationTokensRequired(
+            Configuration flinkConf, org.apache.hadoop.conf.Configuration 
hadoopConf) {
+        return UserGroupInformation.isSecurityEnabled();
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(
+            Configuration flinkConf,
+            org.apache.hadoop.conf.Configuration hadoopConf,
+            Credentials credentials) {
+        try {
+            Set<FileSystem> fileSystemsToAccess = 
getFileSystemsToAccess(flinkConf, hadoopConf);
+
+            final String renewer = getTokenRenewer(hadoopConf);
+            fileSystemsToAccess.forEach(
+                    fs -> {
+                        try {
+                            LOG.info("Getting FS token for: {} with renewer 
{}", fs, renewer);
+                            fs.addDelegationTokens(renewer, credentials);
+                        } catch (IOException e) {
+                            LOG.warn("Failed to get token for {}.", fs);
+                        }
+                    });
+        } catch (IOException e) {
+            LOG.error("Failed to obtain tokens for Hadoop FileSystems: {}", 
e.getMessage());
+        }
+        // Flink does not support to renew the delegation token currently

Review comment:
       If flink doesn't support renew a DT, why would we fail when a renewer 
can't be get in `getTokenRenewer`?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HBaseDelegationTokenProvider.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;

Review comment:
       Shouldn't this be placed in hbase connector and get rid of those 
reflections?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopDelegationTokenProvider.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;

Review comment:
       Does this mean DT provider is only supported when running Flink on YARN? 
Is it possible to support this feature for other deployments, such as 
standalone or k8s?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/security/HBaseDelegationTokenProvider.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Optional;
+
+/** Delegation token provider implementation for HBase. */
+public class HBaseDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseDelegationTokenProvider.class);
+
+    private org.apache.hadoop.conf.Configuration hbaseConf;
+
+    @Override
+    public String serviceName() {
+        return "hbase";
+    }
+
+    @Override
+    public boolean delegationTokensRequired(
+            Configuration flinkConf, org.apache.hadoop.conf.Configuration 
hadoopConf) {
+        if (UserGroupInformation.isSecurityEnabled()) {
+            hbaseConf = createHBaseConfiguration(hadoopConf);
+            LOG.debug("HBase security setting: {}", 
hbaseConf.get("hbase.security.authentication"));
+
+            boolean required = 
"kerberos".equals(hbaseConf.get("hbase.security.authentication"));
+            if (!required) {
+                LOG.debug("HBase has not been configured to use Kerberos.");
+            }
+            return required;
+        } else {
+            return false;
+        }
+    }
+
+    private org.apache.hadoop.conf.Configuration createHBaseConfiguration(
+            org.apache.hadoop.conf.Configuration conf) {
+        try {
+            // ----
+            // Intended call: HBaseConfiguration.create(conf);
+            return (org.apache.hadoop.conf.Configuration)
+                    Class.forName("org.apache.hadoop.hbase.HBaseConfiguration")
+                            .getMethod("create", 
org.apache.hadoop.conf.Configuration.class)
+                            .invoke(null, conf);
+            // ----
+
+        } catch (InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException
+                | ClassNotFoundException e) {
+            LOG.info(
+                    "HBase is not available (not packaged with this 
application): {} : \"{}\".",
+                    e.getClass().getSimpleName(),
+                    e.getMessage());
+        }
+        return conf;
+    }
+
+    @Override
+    public Optional<Long> obtainDelegationTokens(
+            Configuration flinkConf,
+            org.apache.hadoop.conf.Configuration hadoopConf,
+            Credentials credentials) {
+        Token<?> token;
+        try {
+            try {
+                LOG.info("Obtaining Kerberos security token for HBase");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(conf);
+                token =
+                        (Token<?>)
+                                
Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod(
+                                                "obtainToken",
+                                                
org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+            } catch (NoSuchMethodException e) {
+                // for HBase 2
+
+                // ----
+                // Intended call: ConnectionFactory connectionFactory =
+                // ConnectionFactory.createConnection(conf);
+                Closeable connectionFactory =
+                        (Closeable)
+                                
Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory")
+                                        .getMethod(
+                                                "createConnection",
+                                                
org.apache.hadoop.conf.Configuration.class)
+                                        .invoke(null, hbaseConf);
+                // ----
+                Class<?> connectionClass =
+                        
Class.forName("org.apache.hadoop.hbase.client.Connection");
+                // ----
+                // Intended call: Token<AuthenticationTokenIdentifier> token =
+                // TokenUtil.obtainToken(connectionFactory);
+                token =
+                        (Token<?>)
+                                
Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
+                                        .getMethod("obtainToken", 
connectionClass)
+                                        .invoke(null, connectionFactory);
+                if (null != connectionFactory) {
+                    connectionFactory.close();
+                }
+            }
+            if (token == null) {
+                LOG.error("No Kerberos security token for HBase available");

Review comment:
       Similar to the FS DT, shouldn't we fail here?

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProviderTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.security;
+
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+import sun.security.krb5.KrbException;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/** Tests for {@link HadoopFSDelegationTokenProvider}. */
+public class HadoopFSDelegationTokenProviderTest {
+
+    public static final String HADOOP_SECURITY_AUTHENTICATION = 
"hadoop.security.authentication";
+    private final org.apache.flink.configuration.Configuration flinkConf =
+            new org.apache.flink.configuration.Configuration();
+
+    @Test
+    public void testShouldReturnFalseWhenSecurityIsNotEnabled() {
+        HadoopFSDelegationTokenProvider provider = new 
HadoopFSDelegationTokenProvider();
+
+        final Configuration hadoopConf = new Configuration();
+        
assumeTrue("simple".equals(hadoopConf.get(HADOOP_SECURITY_AUTHENTICATION)));
+        assertFalse(
+                "Hadoop FS delegation tokens are not required when 
authentication is simple",
+                provider.delegationTokensRequired(flinkConf, hadoopConf));
+    }
+
+    @Test
+    public void testShouldReturnTrueWhenSecurityIsEnabled() throws 
KrbException {
+        // fake the realm when kerberos is enabled
+        System.setProperty("java.security.krb5.kdc", "");
+        System.setProperty("java.security.krb5.realm", "DEFAULT.REALM");
+        System.setProperty("java.security.krb5.conf", "/dev/null");
+        sun.security.krb5.Config.refresh();
+
+        final Configuration hadoopConf = new Configuration();
+        // set new hadoop conf to UGI to re-initialize it
+        hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+        try {
+            UserGroupInformation.setConfiguration(hadoopConf);
+            HadoopFSDelegationTokenProvider provider = new 
HadoopFSDelegationTokenProvider();
+            assertTrue(
+                    "Hadoop FS delegation tokens are required when 
authentication is not simple",
+                    provider.delegationTokensRequired(flinkConf, hadoopConf));
+        } finally {
+            // restore the default UGI
+            UserGroupInformation.setConfiguration(new Configuration());

Review comment:
       Do we also need to restore the system properties?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to