This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new b0943064e04 [fix](kerberos)fix and refactor ugi login for kerberos and simple authentication (#38607) b0943064e04 is described below commit b0943064e0451b1f2302653313e8cba1aa663f9b Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Thu Aug 1 14:01:32 2024 +0800 [fix](kerberos)fix and refactor ugi login for kerberos and simple authentication (#38607) pick from (#37301) --- .../kerberos/common/conf/doris-krb5.conf | 2 +- .../authentication/AuthenticationConfig.java | 2 + ...icationConfig.java => HadoopAuthenticator.java} | 32 ++-- .../HadoopKerberosAuthenticator.java | 192 +++++++++++++++++++++ .../authentication/HadoopSimpleAuthenticator.java | 47 +++++ .../common/security/authentication/HadoopUGI.java | 89 ++-------- ....java => ImpersonatingHadoopAuthenticator.java} | 29 ++-- .../KerberosAuthenticationConfig.java | 3 + .../doris/datasource/hive/HMSCachedClient.java | 5 + .../doris/datasource/hive/HMSExternalCatalog.java | 12 +- .../doris/datasource/hive/HiveMetadataOps.java | 7 +- .../datasource/hive/ThriftHMSCachedClient.java | 12 +- .../apache/doris/fs/remote/RemoteFileSystem.java | 14 +- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 48 ++++-- .../kerberos/test_two_hive_kerberos.groovy | 33 ++++ 15 files changed, 410 insertions(+), 117 deletions(-) diff --git a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf index 7624b94e6ad..36547b8f89d 100644 --- a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf +++ b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf @@ -24,7 +24,7 @@ default_realm = LABS.TERADATA.COM dns_lookup_realm = false dns_lookup_kdc = false - ticket_lifetime = 24h + ticket_lifetime = 5s # this setting is causing a Message stream modified (41) error when talking to KDC running on CentOS 7: https://stackoverflow.com/a/60978520 # renew_lifetime = 7d forwardable = true diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java index 32a27b2263a..875ae4542e1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java @@ -26,6 +26,7 @@ public abstract class AuthenticationConfig { public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal"; public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file"; + public static String DORIS_KRB5_DEBUG = "doris.krb5.debug"; /** * @return true if the config is valid, otherwise false. @@ -57,6 +58,7 @@ public abstract class AuthenticationConfig { krbConfig.setKerberosPrincipal(conf.get(krbPrincipalKey)); krbConfig.setKerberosKeytab(conf.get(krbKeytabKey)); krbConfig.setConf(conf); + krbConfig.setPrintDebugLog(Boolean.parseBoolean(conf.get(DORIS_KRB5_DEBUG, "false"))); return krbConfig; } else { // AuthType.SIMPLE diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java similarity index 51% copy from fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java copy to fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java index 722cd0352b7..c3cab5f410b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java @@ -17,18 +17,28 @@ package org.apache.doris.common.security.authentication; -import lombok.Data; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; -@Data -public class KerberosAuthenticationConfig extends AuthenticationConfig { - private String kerberosPrincipal; - private String kerberosKeytab; - private Configuration conf; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; - @Override - public boolean isValid() { - return StringUtils.isNotEmpty(kerberosPrincipal) && StringUtils.isNotEmpty(kerberosKeytab); +public interface HadoopAuthenticator { + + UserGroupInformation getUGI() throws IOException; + + default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException { + try { + return getUGI().doAs(action); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig config) { + if (config instanceof KerberosAuthenticationConfig) { + return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config); + } else { + return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config); + } } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java new file mode 100644 index 00000000000..90c7927f5a6 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import javax.security.auth.Subject; +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.kerberos.KerberosTicket; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; + +public class HadoopKerberosAuthenticator implements HadoopAuthenticator { + private static final Logger LOG = LogManager.getLogger(HadoopKerberosAuthenticator.class); + private final KerberosAuthenticationConfig config; + private Subject subject; + private long nextRefreshTime; + private UserGroupInformation ugi; + + public HadoopKerberosAuthenticator(KerberosAuthenticationConfig config) { + this.config = config; + } + + public static void initializeAuthConfig(Configuration hadoopConf) { + hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); + synchronized (HadoopKerberosAuthenticator.class) { + // avoid other catalog set conf at the same time + UserGroupInformation.setConfiguration(hadoopConf); + } + } + + @Override + public synchronized UserGroupInformation getUGI() throws IOException { + if (ugi == null) { + subject = getSubject(config.getKerberosKeytab(), config.getKerberosPrincipal(), config.isPrintDebugLog()); + ugi = Objects.requireNonNull(login(subject), "login result is null"); + return ugi; + } + if (nextRefreshTime < System.currentTimeMillis()) { + long lastRefreshTime = nextRefreshTime; + Subject existingSubject = subject; + if (LOG.isDebugEnabled()) { + Date lastTicketEndTime = getTicketEndTime(subject); + LOG.debug("Current ticket expired time is {}", lastTicketEndTime); + } + // renew subject + Subject newSubject = getSubject(config.getKerberosKeytab(), config.getKerberosPrincipal(), + config.isPrintDebugLog()); + Objects.requireNonNull(login(newSubject), "re-login result is null"); + // modify UGI instead of returning new UGI + existingSubject.getPrincipals().addAll(newSubject.getPrincipals()); + Set<Object> privateCredentials = existingSubject.getPrivateCredentials(); + // clear the old credentials + synchronized (privateCredentials) { + privateCredentials.clear(); + privateCredentials.addAll(newSubject.getPrivateCredentials()); + } + Set<Object> publicCredentials = existingSubject.getPublicCredentials(); + synchronized (publicCredentials) { + publicCredentials.clear(); + publicCredentials.addAll(newSubject.getPublicCredentials()); + } + nextRefreshTime = calculateNextRefreshTime(newSubject); + if (LOG.isDebugEnabled()) { + Date lastTicketEndTime = getTicketEndTime(newSubject); + LOG.debug("Next ticket expired time is {}", lastTicketEndTime); + LOG.debug("Refresh kerberos ticket succeeded, last time is {}, next time is {}", + lastRefreshTime, nextRefreshTime); + } + } + return ugi; + } + + private UserGroupInformation login(Subject subject) throws IOException { + // login and get ugi when catalog is initialized + initializeAuthConfig(config.getConf()); + String principal = config.getKerberosPrincipal(); + if (LOG.isDebugEnabled()) { + LOG.debug("Login by kerberos authentication with principal: {}", principal); + } + return UserGroupInformation.getUGIFromSubject(subject); + } + + private static long calculateNextRefreshTime(Subject subject) { + Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); + KerberosTicket tgtTicket = getTicketGrantingTicket(subject); + return getRefreshTime(tgtTicket); + } + + private static Date getTicketEndTime(Subject subject) { + Preconditions.checkArgument(subject != null, "subject must be present in kerberos based UGI"); + KerberosTicket tgtTicket = getTicketGrantingTicket(subject); + return tgtTicket.getEndTime(); + } + + public static KerberosTicket getTicketGrantingTicket(Subject subject) { + Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class); + for (KerberosTicket ticket : tickets) { + if (isOriginalTicketGrantingTicket(ticket)) { + return ticket; + } + } + throw new IllegalArgumentException("kerberos ticket not found in " + subject); + } + + public static boolean isOriginalTicketGrantingTicket(KerberosTicket ticket) { + return isTicketGrantingServerPrincipal(ticket.getServer()); + } + + private static boolean isTicketGrantingServerPrincipal(KerberosPrincipal principal) { + if (principal == null) { + return false; + } + if (principal.getName().equals("krbtgt/" + principal.getRealm() + "@" + principal.getRealm())) { + return true; + } + return false; + } + + public static long getRefreshTime(KerberosTicket ticket) { + long start = ticket.getStartTime().getTime(); + long end = ticket.getEndTime().getTime(); + return start + (long) ((end - start) * 0.8f); + } + + private static Subject getSubject(String keytab, String principal, boolean printDebugLog) { + Subject subject = new Subject(false, ImmutableSet.of(new KerberosPrincipal(principal)), + Collections.emptySet(), Collections.emptySet()); + javax.security.auth.login.Configuration conf = getConfiguration(keytab, principal, printDebugLog); + try { + LoginContext loginContext = new LoginContext("", subject, null, conf); + loginContext.login(); + return loginContext.getSubject(); + } catch (LoginException e) { + throw new RuntimeException(e); + } + } + + private static javax.security.auth.login.Configuration getConfiguration(String keytab, String principal, + boolean printDebugLog) { + return new javax.security.auth.login.Configuration() { + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder() + .put("doNotPrompt", "true") + .put("isInitiator", "true") + .put("useKeyTab", "true") + .put("storeKey", "true") + .put("keyTab", keytab) + .put("principal", principal); + if (printDebugLog) { + builder.put("debug", "true"); + } + Map<String, String> options = builder.build(); + return new AppConfigurationEntry[]{ + new AppConfigurationEntry( + "com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + options)}; + } + }; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java new file mode 100644 index 00000000000..fbe0d0aba7d --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.security.authentication; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class HadoopSimpleAuthenticator implements HadoopAuthenticator { + private static final Logger LOG = LogManager.getLogger(HadoopSimpleAuthenticator.class); + private final UserGroupInformation ugi; + + public HadoopSimpleAuthenticator(SimpleAuthenticationConfig config) { + String hadoopUserName = config.getUsername(); + if (hadoopUserName == null) { + hadoopUserName = "hadoop"; + config.setUsername(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("{} is unset, use default user: hadoop", AuthenticationConfig.HADOOP_USER_NAME); + } + } + ugi = UserGroupInformation.createRemoteUser(hadoopUserName); + if (LOG.isDebugEnabled()) { + LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); + } + } + + @Override + public UserGroupInformation getUGI() { + return ugi; + } +} diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java index 1a86b9e327a..d04d772728b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java @@ -18,8 +18,6 @@ package org.apache.doris.common.security.authentication; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,6 +25,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.security.PrivilegedExceptionAction; +@Deprecated public class HadoopUGI { private static final Logger LOG = LogManager.getLogger(HadoopUGI.class); @@ -39,82 +38,30 @@ public class HadoopUGI { if (config == null || !config.isValid()) { return null; } - UserGroupInformation ugi; if (config instanceof KerberosAuthenticationConfig) { - KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; - Configuration hadoopConf = krbConfig.getConf(); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true"); - UserGroupInformation.setConfiguration(hadoopConf); - String principal = krbConfig.getKerberosPrincipal(); try { - // login hadoop with keytab and try checking TGT - ugi = UserGroupInformation.getLoginUser(); - LOG.debug("Current login user: {}", ugi.getUserName()); - if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { - // if the current user is logged by kerberos and is the same user - // just use checkTGTAndReloginFromKeytab because this method will only relogin - // when the TGT is expired or is close to expiry - ugi.checkTGTAndReloginFromKeytab(); - return ugi; + // TODO: remove after iceberg and hudi kerberos test case pass + try { + // login hadoop with keytab and try checking TGT + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + LOG.debug("Current login user: {}", ugi.getUserName()); + String principal = ((KerberosAuthenticationConfig) config).getKerberosPrincipal(); + if (ugi.hasKerberosCredentials() && StringUtils.equals(ugi.getUserName(), principal)) { + // if the current user is logged by kerberos and is the same user + // just use checkTGTAndReloginFromKeytab because this method will only relogin + // when the TGT is expired or is close to expiry + ugi.checkTGTAndReloginFromKeytab(); + return ugi; + } + } catch (IOException e) { + LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with kerberos, do login immediately.", e); - } - try { - ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, krbConfig.getKerberosKeytab()); - UserGroupInformation.setLoginUser(ugi); - LOG.debug("Login by kerberos authentication with principal: {}", principal); - return ugi; + return new HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config).getUGI(); } catch (IOException e) { throw new RuntimeException(e); } } else { - String hadoopUserName = ((SimpleAuthenticationConfig) config).getUsername(); - if (hadoopUserName == null) { - hadoopUserName = "hadoop"; - ((SimpleAuthenticationConfig) config).setUsername(hadoopUserName); - LOG.debug(AuthenticationConfig.HADOOP_USER_NAME + " is unset, use default user: hadoop"); - } - - try { - ugi = UserGroupInformation.getLoginUser(); - if (ugi.getUserName().equals(hadoopUserName)) { - return ugi; - } - } catch (IOException e) { - LOG.warn("A SecurityException occurs with simple, do login immediately.", e); - } - - ugi = UserGroupInformation.createRemoteUser(hadoopUserName); - UserGroupInformation.setLoginUser(ugi); - LOG.debug("Login by proxy user, hadoop.username: {}", hadoopUserName); - return ugi; - } - } - - /** - * use for HMSExternalCatalog to login - * @param config auth config - */ - public static void tryKrbLogin(String catalogName, AuthenticationConfig config) { - if (config instanceof KerberosAuthenticationConfig) { - KerberosAuthenticationConfig krbConfig = (KerberosAuthenticationConfig) config; - try { - Configuration hadoopConf = krbConfig.getConf(); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); - hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED, "true"); - UserGroupInformation.setConfiguration(hadoopConf); - /** - * Because metastore client is created by using - * {@link org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy} - * it will relogin when TGT is expired, so we don't need to relogin manually. - */ - UserGroupInformation.loginUserFromKeytab(krbConfig.getKerberosPrincipal(), - krbConfig.getKerberosKeytab()); - } catch (IOException e) { - throw new RuntimeException("login with kerberos auth failed for catalog: " + catalogName, e); - } + return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) config).getUGI(); } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java similarity index 54% copy from fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java copy to fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java index 722cd0352b7..10e42f4bc67 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java @@ -17,18 +17,27 @@ package org.apache.doris.common.security.authentication; -import lombok.Data; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; -@Data -public class KerberosAuthenticationConfig extends AuthenticationConfig { - private String kerberosPrincipal; - private String kerberosKeytab; - private Configuration conf; +import java.io.IOException; +import java.util.Objects; + +public class ImpersonatingHadoopAuthenticator implements HadoopAuthenticator { + + private final HadoopAuthenticator delegate; + private final String username; + private UserGroupInformation ugi; + + public ImpersonatingHadoopAuthenticator(HadoopAuthenticator delegate, String username) { + this.delegate = Objects.requireNonNull(delegate); + this.username = Objects.requireNonNull(username); + } @Override - public boolean isValid() { - return StringUtils.isNotEmpty(kerberosPrincipal) && StringUtils.isNotEmpty(kerberosKeytab); + public synchronized UserGroupInformation getUGI() throws IOException { + if (ugi == null) { + ugi = UserGroupInformation.createProxyUser(username, delegate.getUGI()); + } + return ugi; } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java index 722cd0352b7..adf76274386 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java @@ -18,14 +18,17 @@ package org.apache.doris.common.security.authentication; import lombok.Data; +import lombok.EqualsAndHashCode; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +@EqualsAndHashCode(callSuper = true) @Data public class KerberosAuthenticationConfig extends AuthenticationConfig { private String kerberosPrincipal; private String kerberosKeytab; private Configuration conf; + private boolean printDebugLog; @Override public boolean isValid() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index 1ac77972053..a5e0eefb348 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.DatabaseMetadata; import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -113,6 +114,10 @@ public interface HMSCachedClient { void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData); + default void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + // Ignored by default + } + /** * close the connection, eg, to hms */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 91192b63c24..a22eacaf1e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -24,7 +24,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopUGI; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; @@ -40,7 +40,9 @@ import org.apache.doris.fs.FileSystemProviderImpl; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.transaction.TransactionManagerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import lombok.Getter; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.logging.log4j.LogManager; @@ -68,7 +70,10 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16; private ThreadPoolExecutor fileSystemExecutor; + @Getter + private HadoopAuthenticator authenticator; + @VisibleForTesting public HMSExternalCatalog() { catalogProperty = new CatalogProperty(null, null); } @@ -81,6 +86,8 @@ public class HMSExternalCatalog extends ExternalCatalog { super(catalogId, name, InitCatalogLog.Type.HMS, comment); props = PropertyConverter.convertToMetaProperties(props); catalogProperty = new CatalogProperty(resource, props); + AuthenticationConfig config = AuthenticationConfig.getKerberosConfig(getConfiguration()); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(config); } @Override @@ -148,9 +155,6 @@ public class HMSExternalCatalog extends ExternalCatalog { } hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(), String.valueOf(Config.hive_metastore_client_timeout_second)); - HadoopUGI.tryKrbLogin(this.getName(), AuthenticationConfig.getKerberosConfig(hiveConf, - AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, - AuthenticationConfig.HADOOP_KERBEROS_KEYTAB)); } HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 9f9fbebe28d..dcfc6d1ad33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -33,6 +33,7 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; @@ -61,11 +62,14 @@ public class HiveMetadataOps implements ExternalMetadataOps { private static final int MIN_CLIENT_POOL_SIZE = 8; private final HMSCachedClient client; private final HMSExternalCatalog catalog; + private HadoopAuthenticator hadoopAuthenticator; public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) { this(catalog, createCachedClient(hiveConf, Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig)); + hadoopAuthenticator = catalog.getAuthenticator(); + client.setHadoopAuthenticator(hadoopAuthenticator); } @VisibleForTesting @@ -85,7 +89,8 @@ public class HiveMetadataOps implements ExternalMetadataOps { private static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize, JdbcClientConfig jdbcClientConfig) { if (hiveConf != null) { - return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize); + ThriftHMSCachedClient client = new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize); + return client; } Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and jdbcClientConfig are both null"); String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index 55d8ffc2e02..7632101d7ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.common.Config; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.datasource.DatabaseMetadata; import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -92,6 +93,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { private boolean isClosed = false; private final int poolSize; private final HiveConf hiveConf; + private HadoopAuthenticator hadoopAuthenticator; public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) { Preconditions.checkArgument(poolSize > 0, poolSize); @@ -104,6 +106,10 @@ public class ThriftHMSCachedClient implements HMSCachedClient { this.isClosed = false; } + public void setHadoopAuthenticator(HadoopAuthenticator hadoopAuthenticator) { + this.hadoopAuthenticator = hadoopAuthenticator; + } + @Override public void close() { synchronized (clientPool) { @@ -678,7 +684,11 @@ public class ThriftHMSCachedClient implements HMSCachedClient { } private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) { - return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action); + try { + return hadoopAuthenticator.doAs(action); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 68de3a8fdef..3cb8a036c2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Set; @@ -56,7 +57,7 @@ public abstract class RemoteFileSystem extends PersistentFileSystem { try { org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath); Path locatedPath = new Path(remotePath); - RemoteIterator<LocatedFileStatus> locatedFiles = fileSystem.listFiles(locatedPath, recursive); + RemoteIterator<LocatedFileStatus> locatedFiles = getLocatedFiles(recursive, fileSystem, locatedPath); while (locatedFiles.hasNext()) { LocatedFileStatus fileStatus = locatedFiles.next(); RemoteFile location = new RemoteFile( @@ -72,11 +73,16 @@ public abstract class RemoteFileSystem extends PersistentFileSystem { return Status.OK; } + protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive, + FileSystem fileSystem, Path locatedPath) throws IOException { + return fileSystem.listFiles(locatedPath, recursive); + } + @Override public Status listDirectories(String remotePath, Set<String> result) { try { FileSystem fileSystem = nativeFileSystem(remotePath); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath)); + FileStatus[] fileStatuses = getFileStatuses(remotePath, fileSystem); result.addAll( Arrays.stream(fileStatuses) .filter(FileStatus::isDirectory) @@ -88,6 +94,10 @@ public abstract class RemoteFileSystem extends PersistentFileSystem { return Status.OK; } + protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException { + return fileSystem.listStatus(new Path(remotePath)); + } + @Override public Status renameDir(String origFilePath, String destFilePath, diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 944051e8741..59fbd73bda7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -21,7 +21,7 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.AuthenticationConfig; -import org.apache.doris.common.security.authentication.HadoopUGI; +import org.apache.doris.common.security.authentication.HadoopAuthenticator; import org.apache.doris.common.util.URI; import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; @@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,8 +60,8 @@ public class DFSFileSystem extends RemoteFileSystem { public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed"; private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class); - private HDFSFileOperations operations = null; + private HadoopAuthenticator authenticator = null; public DFSFileSystem(Map<String, String> properties) { this(StorageBackend.StorageType.HDFS, properties); @@ -80,21 +82,35 @@ public class DFSFileSystem extends RemoteFileSystem { for (Map.Entry<String, String> propEntry : properties.entrySet()) { conf.set(propEntry.getKey(), propEntry.getValue()); } - - dfsFileSystem = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> { - try { - return FileSystem.get(new Path(remotePath).toUri(), conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + AuthenticationConfig authConfig = AuthenticationConfig.getKerberosConfig(conf); + authenticator = HadoopAuthenticator.getHadoopAuthenticator(authConfig); + try { + dfsFileSystem = authenticator.doAs(() -> { + try { + return FileSystem.get(new Path(remotePath).toUri(), conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (Exception e) { + throw new UserException(e); + } + operations = new HDFSFileOperations(dfsFileSystem); } } } - operations = new HDFSFileOperations(dfsFileSystem); return dfsFileSystem; } + protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive, + FileSystem fileSystem, Path locatedPath) throws IOException { + return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, recursive)); + } + + protected FileStatus[] getFileStatuses(String remotePath, FileSystem fileSystem) throws IOException { + return authenticator.doAs(() -> fileSystem.listStatus(new Path(remotePath))); + } + public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) { Configuration hdfsConf = new HdfsConfiguration(); if (fallbackToSimpleAuth) { @@ -266,7 +282,7 @@ public class DFSFileSystem extends RemoteFileSystem { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); FileSystem fileSystem = nativeFileSystem(remotePath); - boolean isPathExist = fileSystem.exists(inputFilePath); + boolean isPathExist = authenticator.doAs(() -> fileSystem.exists(inputFilePath)); if (!isPathExist) { return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); } @@ -381,7 +397,7 @@ public class DFSFileSystem extends RemoteFileSystem { FileSystem fileSystem = nativeFileSystem(destPath); Path srcfilePath = new Path(srcPathUri.getPath()); Path destfilePath = new Path(destPathUri.getPath()); - boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath); + boolean isRenameSuccess = authenticator.doAs(() -> fileSystem.rename(srcfilePath, destfilePath)); if (!isRenameSuccess) { return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath); } @@ -402,7 +418,7 @@ public class DFSFileSystem extends RemoteFileSystem { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); FileSystem fileSystem = nativeFileSystem(remotePath); - fileSystem.delete(inputFilePath, true); + authenticator.doAs(() -> fileSystem.delete(inputFilePath, true)); } catch (UserException e) { return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); } catch (IOException e) { @@ -428,7 +444,7 @@ public class DFSFileSystem extends RemoteFileSystem { URI pathUri = URI.create(remotePath); FileSystem fileSystem = nativeFileSystem(remotePath); Path pathPattern = new Path(pathUri.getPath()); - FileStatus[] files = fileSystem.globStatus(pathPattern); + FileStatus[] files = authenticator.doAs(() -> fileSystem.globStatus(pathPattern)); if (files == null) { LOG.info("no files in path " + remotePath); return Status.OK; @@ -455,7 +471,7 @@ public class DFSFileSystem extends RemoteFileSystem { public Status makeDir(String remotePath) { try { FileSystem fileSystem = nativeFileSystem(remotePath); - if (!fileSystem.mkdirs(new Path(remotePath))) { + if (!authenticator.doAs(() -> fileSystem.mkdirs(new Path(remotePath)))) { LOG.warn("failed to make dir for " + remotePath); return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath); } diff --git a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy index a3b39d1221a..7e7f276236a 100644 --- a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy +++ b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy @@ -1,3 +1,5 @@ +import groovyjarjarantlr4.v4.codegen.model.ExceptionClause + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -15,6 +17,8 @@ // specific language governing permissions and limitations // under the License. +import org.junit.Assert; + suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_docker_kerberos") { String enabled = context.config.otherConfigs.get("enableKerberosTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { @@ -66,7 +70,36 @@ suite("test_two_hive_kerberos", "p0,external,kerberos,external_docker,external_d sql """ use test_krb_hive_db """ order_qt_q02 """ select * from test_krb_hive_db.test_krb_hive_tbl """ + // 3. multi thread test + Thread thread1 = new Thread(() -> { + try { + for (int i = 0; i < 1000; i++) { + sql """ select * from ${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + + Thread thread2 = new Thread(() -> { + try { + for (int i = 0; i < 1000; i++) { + sql """ select * from other_${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """ + } + } catch (Exception e) { + log.info(e.getMessage()) + Assert.fail(); + } + }) + sleep(5000L) + thread1.start() + thread2.start() + + thread1.join() + thread2.join() sql """drop catalog ${hms_catalog_name};""" sql """drop catalog other_${hms_catalog_name};""" + // TODO: add tvf case } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org