lirui-apache commented on a change in pull request #14841: URL: https://github.com/apache/flink/pull/14841#discussion_r571603548
########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProvider.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.function.FunctionUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.Master; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Delegation token provider implementation for Hadoop FileSystems. */ +public class HadoopFSDelegationTokenProvider implements HadoopDelegationTokenProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class); + + @Override + public String serviceName() { + return "hadoopfs"; + } + + @Override + public boolean delegationTokensRequired( + Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) { + return UserGroupInformation.isSecurityEnabled(); + } + + @Override + public Optional<Long> obtainDelegationTokens( + Configuration flinkConf, + org.apache.hadoop.conf.Configuration hadoopConf, + Credentials credentials) { + try { + Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess(flinkConf, hadoopConf); + + final String renewer = getTokenRenewer(hadoopConf); + fileSystemsToAccess.forEach( + fs -> { + try { + LOG.info("Getting FS token for: {} with renewer {}", fs, renewer); + fs.addDelegationTokens(renewer, credentials); + } catch (IOException e) { + LOG.warn("Failed to get token for {}.", fs); + } + }); + } catch (IOException e) { + LOG.error("Failed to obtain tokens for Hadoop FileSystems: {}", e.getMessage()); Review comment: Shouldn't we error out if DT can't be obtained? I suppose the job would fail in that case. So it's better to fail fast. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HBaseDelegationTokenProvider.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Optional; + +/** Delegation token provider implementation for HBase. */ +public class HBaseDelegationTokenProvider implements HadoopDelegationTokenProvider { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class); + + private org.apache.hadoop.conf.Configuration hbaseConf; + + @Override + public String serviceName() { + return "hbase"; + } + + @Override + public boolean delegationTokensRequired( + Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) { + if (UserGroupInformation.isSecurityEnabled()) { + hbaseConf = createHBaseConfiguration(hadoopConf); + LOG.debug("HBase security setting: {}", hbaseConf.get("hbase.security.authentication")); Review comment: If this method is not frequently called, let's not change the log level here. Users may rely on the previous INFO level logs. ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopDelegationTokenProvider.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; + +import java.util.Optional; + +/** Hadoop delegation token provider. */ +public interface HadoopDelegationTokenProvider { + + /** Name of the service to provide delegation tokens. This name should be unique. */ + String serviceName(); + + /** + * Return true if delegation tokens are required for this service. + * + * @param flinkConf Flink configuration + * @param hadoopConf Hadoop configuration + * @return true if delegation tokens are required + */ + boolean delegationTokensRequired( Review comment: Both `delegationTokensRequired` and `obtainDelegationTokens` take some configuration parameters. I wonder what's the relationship between these configurations? Should `obtainDelegationTokens` check whether DT is required before actually get the DT? If the configurations are identical, maybe we can have an `init` method to configure the provider and other methods won't accept configurations parameters? ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProvider.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.function.FunctionUtils; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.Master; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Delegation token provider implementation for Hadoop FileSystems. */ +public class HadoopFSDelegationTokenProvider implements HadoopDelegationTokenProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(HadoopFSDelegationTokenProvider.class); + + @Override + public String serviceName() { + return "hadoopfs"; + } + + @Override + public boolean delegationTokensRequired( + Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) { + return UserGroupInformation.isSecurityEnabled(); + } + + @Override + public Optional<Long> obtainDelegationTokens( + Configuration flinkConf, + org.apache.hadoop.conf.Configuration hadoopConf, + Credentials credentials) { + try { + Set<FileSystem> fileSystemsToAccess = getFileSystemsToAccess(flinkConf, hadoopConf); + + final String renewer = getTokenRenewer(hadoopConf); + fileSystemsToAccess.forEach( + fs -> { + try { + LOG.info("Getting FS token for: {} with renewer {}", fs, renewer); + fs.addDelegationTokens(renewer, credentials); + } catch (IOException e) { + LOG.warn("Failed to get token for {}.", fs); + } + }); + } catch (IOException e) { + LOG.error("Failed to obtain tokens for Hadoop FileSystems: {}", e.getMessage()); + } + // Flink does not support to renew the delegation token currently Review comment: If flink doesn't support renew a DT, why would we fail when a renewer can't be get in `getTokenRenewer`? ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HBaseDelegationTokenProvider.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; Review comment: Shouldn't this be placed in hbase connector and get rid of those reflections? ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HadoopDelegationTokenProvider.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; Review comment: Does this mean DT provider is only supported when running Flink on YARN? Is it possible to support this feature for other deployments, such as standalone or k8s? ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/security/HBaseDelegationTokenProvider.java ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; + +import org.apache.flink.configuration.Configuration; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Optional; + +/** Delegation token provider implementation for HBase. */ +public class HBaseDelegationTokenProvider implements HadoopDelegationTokenProvider { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseDelegationTokenProvider.class); + + private org.apache.hadoop.conf.Configuration hbaseConf; + + @Override + public String serviceName() { + return "hbase"; + } + + @Override + public boolean delegationTokensRequired( + Configuration flinkConf, org.apache.hadoop.conf.Configuration hadoopConf) { + if (UserGroupInformation.isSecurityEnabled()) { + hbaseConf = createHBaseConfiguration(hadoopConf); + LOG.debug("HBase security setting: {}", hbaseConf.get("hbase.security.authentication")); + + boolean required = "kerberos".equals(hbaseConf.get("hbase.security.authentication")); + if (!required) { + LOG.debug("HBase has not been configured to use Kerberos."); + } + return required; + } else { + return false; + } + } + + private org.apache.hadoop.conf.Configuration createHBaseConfiguration( + org.apache.hadoop.conf.Configuration conf) { + try { + // ---- + // Intended call: HBaseConfiguration.create(conf); + return (org.apache.hadoop.conf.Configuration) + Class.forName("org.apache.hadoop.hbase.HBaseConfiguration") + .getMethod("create", org.apache.hadoop.conf.Configuration.class) + .invoke(null, conf); + // ---- + + } catch (InvocationTargetException + | NoSuchMethodException + | IllegalAccessException + | ClassNotFoundException e) { + LOG.info( + "HBase is not available (not packaged with this application): {} : \"{}\".", + e.getClass().getSimpleName(), + e.getMessage()); + } + return conf; + } + + @Override + public Optional<Long> obtainDelegationTokens( + Configuration flinkConf, + org.apache.hadoop.conf.Configuration hadoopConf, + Credentials credentials) { + Token<?> token; + try { + try { + LOG.info("Obtaining Kerberos security token for HBase"); + // ---- + // Intended call: Token<AuthenticationTokenIdentifier> token = + // TokenUtil.obtainToken(conf); + token = + (Token<?>) + Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil") + .getMethod( + "obtainToken", + org.apache.hadoop.conf.Configuration.class) + .invoke(null, hbaseConf); + } catch (NoSuchMethodException e) { + // for HBase 2 + + // ---- + // Intended call: ConnectionFactory connectionFactory = + // ConnectionFactory.createConnection(conf); + Closeable connectionFactory = + (Closeable) + Class.forName("org.apache.hadoop.hbase.client.ConnectionFactory") + .getMethod( + "createConnection", + org.apache.hadoop.conf.Configuration.class) + .invoke(null, hbaseConf); + // ---- + Class<?> connectionClass = + Class.forName("org.apache.hadoop.hbase.client.Connection"); + // ---- + // Intended call: Token<AuthenticationTokenIdentifier> token = + // TokenUtil.obtainToken(connectionFactory); + token = + (Token<?>) + Class.forName("org.apache.hadoop.hbase.security.token.TokenUtil") + .getMethod("obtainToken", connectionClass) + .invoke(null, connectionFactory); + if (null != connectionFactory) { + connectionFactory.close(); + } + } + if (token == null) { + LOG.error("No Kerberos security token for HBase available"); Review comment: Similar to the FS DT, shouldn't we fail here? ########## File path: flink-yarn/src/test/java/org/apache/flink/yarn/security/HadoopFSDelegationTokenProviderTest.java ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.security; + +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; +import sun.security.krb5.KrbException; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +/** Tests for {@link HadoopFSDelegationTokenProvider}. */ +public class HadoopFSDelegationTokenProviderTest { + + public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; + private final org.apache.flink.configuration.Configuration flinkConf = + new org.apache.flink.configuration.Configuration(); + + @Test + public void testShouldReturnFalseWhenSecurityIsNotEnabled() { + HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider(); + + final Configuration hadoopConf = new Configuration(); + assumeTrue("simple".equals(hadoopConf.get(HADOOP_SECURITY_AUTHENTICATION))); + assertFalse( + "Hadoop FS delegation tokens are not required when authentication is simple", + provider.delegationTokensRequired(flinkConf, hadoopConf)); + } + + @Test + public void testShouldReturnTrueWhenSecurityIsEnabled() throws KrbException { + // fake the realm when kerberos is enabled + System.setProperty("java.security.krb5.kdc", ""); + System.setProperty("java.security.krb5.realm", "DEFAULT.REALM"); + System.setProperty("java.security.krb5.conf", "/dev/null"); + sun.security.krb5.Config.refresh(); + + final Configuration hadoopConf = new Configuration(); + // set new hadoop conf to UGI to re-initialize it + hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + try { + UserGroupInformation.setConfiguration(hadoopConf); + HadoopFSDelegationTokenProvider provider = new HadoopFSDelegationTokenProvider(); + assertTrue( + "Hadoop FS delegation tokens are required when authentication is not simple", + provider.delegationTokensRequired(flinkConf, hadoopConf)); + } finally { + // restore the default UGI + UserGroupInformation.setConfiguration(new Configuration()); Review comment: Do we also need to restore the system properties? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org