This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b0943064e04 [fix](kerberos)fix and refactor ugi login for kerberos and 
simple authentication (#38607)
b0943064e04 is described below

commit b0943064e0451b1f2302653313e8cba1aa663f9b
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Thu Aug 1 14:01:32 2024 +0800

    [fix](kerberos)fix and refactor ugi login for kerberos and simple 
authentication (#38607)
    
    pick from  (#37301)
---
 .../kerberos/common/conf/doris-krb5.conf           |   2 +-
 .../authentication/AuthenticationConfig.java       |   2 +
 ...icationConfig.java => HadoopAuthenticator.java} |  32 ++--
 .../HadoopKerberosAuthenticator.java               | 192 +++++++++++++++++++++
 .../authentication/HadoopSimpleAuthenticator.java  |  47 +++++
 .../common/security/authentication/HadoopUGI.java  |  89 ++--------
 ....java => ImpersonatingHadoopAuthenticator.java} |  29 ++--
 .../KerberosAuthenticationConfig.java              |   3 +
 .../doris/datasource/hive/HMSCachedClient.java     |   5 +
 .../doris/datasource/hive/HMSExternalCatalog.java  |  12 +-
 .../doris/datasource/hive/HiveMetadataOps.java     |   7 +-
 .../datasource/hive/ThriftHMSCachedClient.java     |  12 +-
 .../apache/doris/fs/remote/RemoteFileSystem.java   |  14 +-
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  48 ++++--
 .../kerberos/test_two_hive_kerberos.groovy         |  33 ++++
 15 files changed, 410 insertions(+), 117 deletions(-)

diff --git 
a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf 
b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf
index 7624b94e6ad..36547b8f89d 100644
--- a/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf
+++ b/docker/thirdparties/docker-compose/kerberos/common/conf/doris-krb5.conf
@@ -24,7 +24,7 @@
  default_realm = LABS.TERADATA.COM
  dns_lookup_realm = false
  dns_lookup_kdc = false
- ticket_lifetime = 24h
+ ticket_lifetime = 5s
  # this setting is causing a Message stream modified (41) error when talking 
to KDC running on CentOS 7: https://stackoverflow.com/a/60978520
  # renew_lifetime = 7d
  forwardable = true
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
index 32a27b2263a..875ae4542e1 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
@@ -26,6 +26,7 @@ public abstract class AuthenticationConfig {
     public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
     public static String HIVE_KERBEROS_PRINCIPAL = 
"hive.metastore.kerberos.principal";
     public static String HIVE_KERBEROS_KEYTAB = 
"hive.metastore.kerberos.keytab.file";
+    public static String DORIS_KRB5_DEBUG = "doris.krb5.debug";
 
     /**
      * @return true if the config is valid, otherwise false.
@@ -57,6 +58,7 @@ public abstract class AuthenticationConfig {
             krbConfig.setKerberosPrincipal(conf.get(krbPrincipalKey));
             krbConfig.setKerberosKeytab(conf.get(krbKeytabKey));
             krbConfig.setConf(conf);
+            
krbConfig.setPrintDebugLog(Boolean.parseBoolean(conf.get(DORIS_KRB5_DEBUG, 
"false")));
             return krbConfig;
         } else {
             // AuthType.SIMPLE
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
similarity index 51%
copy from 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
copy to 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
index 722cd0352b7..c3cab5f410b 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopAuthenticator.java
@@ -17,18 +17,28 @@
 
 package org.apache.doris.common.security.authentication;
 
-import lombok.Data;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 
-@Data
-public class KerberosAuthenticationConfig extends AuthenticationConfig {
-    private String kerberosPrincipal;
-    private String kerberosKeytab;
-    private Configuration conf;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
-    @Override
-    public boolean isValid() {
-        return StringUtils.isNotEmpty(kerberosPrincipal) && 
StringUtils.isNotEmpty(kerberosKeytab);
+public interface HadoopAuthenticator {
+
+    UserGroupInformation getUGI() throws IOException;
+
+    default <T> T doAs(PrivilegedExceptionAction<T> action) throws IOException 
{
+        try {
+            return getUGI().doAs(action);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    static HadoopAuthenticator getHadoopAuthenticator(AuthenticationConfig 
config) {
+        if (config instanceof KerberosAuthenticationConfig) {
+            return new 
HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config);
+        } else {
+            return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) 
config);
+        }
     }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java
new file mode 100644
index 00000000000..90c7927f5a6
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopKerberosAuthenticator.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.security.authentication;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+public class HadoopKerberosAuthenticator implements HadoopAuthenticator {
+    private static final Logger LOG = 
LogManager.getLogger(HadoopKerberosAuthenticator.class);
+    private final KerberosAuthenticationConfig config;
+    private Subject subject;
+    private long nextRefreshTime;
+    private UserGroupInformation ugi;
+
+    public HadoopKerberosAuthenticator(KerberosAuthenticationConfig config) {
+        this.config = config;
+    }
+
+    public static void initializeAuthConfig(Configuration hadoopConf) {
+        
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
"true");
+        synchronized (HadoopKerberosAuthenticator.class) {
+            // avoid other catalog set conf at the same time
+            UserGroupInformation.setConfiguration(hadoopConf);
+        }
+    }
+
+    @Override
+    public synchronized UserGroupInformation getUGI() throws IOException {
+        if (ugi == null) {
+            subject = getSubject(config.getKerberosKeytab(), 
config.getKerberosPrincipal(), config.isPrintDebugLog());
+            ugi = Objects.requireNonNull(login(subject), "login result is 
null");
+            return ugi;
+        }
+        if (nextRefreshTime < System.currentTimeMillis()) {
+            long lastRefreshTime = nextRefreshTime;
+            Subject existingSubject = subject;
+            if (LOG.isDebugEnabled()) {
+                Date lastTicketEndTime = getTicketEndTime(subject);
+                LOG.debug("Current ticket expired time is {}", 
lastTicketEndTime);
+            }
+            // renew subject
+            Subject newSubject = getSubject(config.getKerberosKeytab(), 
config.getKerberosPrincipal(),
+                    config.isPrintDebugLog());
+            Objects.requireNonNull(login(newSubject), "re-login result is 
null");
+            // modify UGI instead of returning new UGI
+            existingSubject.getPrincipals().addAll(newSubject.getPrincipals());
+            Set<Object> privateCredentials = 
existingSubject.getPrivateCredentials();
+            // clear the old credentials
+            synchronized (privateCredentials) {
+                privateCredentials.clear();
+                privateCredentials.addAll(newSubject.getPrivateCredentials());
+            }
+            Set<Object> publicCredentials = 
existingSubject.getPublicCredentials();
+            synchronized (publicCredentials) {
+                publicCredentials.clear();
+                publicCredentials.addAll(newSubject.getPublicCredentials());
+            }
+            nextRefreshTime = calculateNextRefreshTime(newSubject);
+            if (LOG.isDebugEnabled()) {
+                Date lastTicketEndTime = getTicketEndTime(newSubject);
+                LOG.debug("Next ticket expired time is {}", lastTicketEndTime);
+                LOG.debug("Refresh kerberos ticket succeeded, last time is {}, 
next time is {}",
+                        lastRefreshTime, nextRefreshTime);
+            }
+        }
+        return ugi;
+    }
+
+    private UserGroupInformation login(Subject subject) throws IOException {
+        // login and get ugi when catalog is initialized
+        initializeAuthConfig(config.getConf());
+        String principal = config.getKerberosPrincipal();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Login by kerberos authentication with principal: {}", 
principal);
+        }
+        return UserGroupInformation.getUGIFromSubject(subject);
+    }
+
+    private static long calculateNextRefreshTime(Subject subject) {
+        Preconditions.checkArgument(subject != null, "subject must be present 
in kerberos based UGI");
+        KerberosTicket tgtTicket = getTicketGrantingTicket(subject);
+        return getRefreshTime(tgtTicket);
+    }
+
+    private static Date getTicketEndTime(Subject subject) {
+        Preconditions.checkArgument(subject != null, "subject must be present 
in kerberos based UGI");
+        KerberosTicket tgtTicket = getTicketGrantingTicket(subject);
+        return tgtTicket.getEndTime();
+    }
+
+    public static KerberosTicket getTicketGrantingTicket(Subject subject) {
+        Set<KerberosTicket> tickets = 
subject.getPrivateCredentials(KerberosTicket.class);
+        for (KerberosTicket ticket : tickets) {
+            if (isOriginalTicketGrantingTicket(ticket)) {
+                return ticket;
+            }
+        }
+        throw new IllegalArgumentException("kerberos ticket not found in " + 
subject);
+    }
+
+    public static boolean isOriginalTicketGrantingTicket(KerberosTicket 
ticket) {
+        return isTicketGrantingServerPrincipal(ticket.getServer());
+    }
+
+    private static boolean isTicketGrantingServerPrincipal(KerberosPrincipal 
principal) {
+        if (principal == null) {
+            return false;
+        }
+        if (principal.getName().equals("krbtgt/" + principal.getRealm() + "@" 
+ principal.getRealm())) {
+            return true;
+        }
+        return false;
+    }
+
+    public static long getRefreshTime(KerberosTicket ticket) {
+        long start = ticket.getStartTime().getTime();
+        long end = ticket.getEndTime().getTime();
+        return start + (long) ((end - start) * 0.8f);
+    }
+
+    private static Subject getSubject(String keytab, String principal, boolean 
printDebugLog) {
+        Subject subject = new Subject(false, ImmutableSet.of(new 
KerberosPrincipal(principal)),
+                Collections.emptySet(), Collections.emptySet());
+        javax.security.auth.login.Configuration conf = 
getConfiguration(keytab, principal, printDebugLog);
+        try {
+            LoginContext loginContext = new LoginContext("", subject, null, 
conf);
+            loginContext.login();
+            return loginContext.getSubject();
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static javax.security.auth.login.Configuration 
getConfiguration(String keytab, String principal,
+                                                                            
boolean printDebugLog) {
+        return new javax.security.auth.login.Configuration() {
+            @Override
+            public AppConfigurationEntry[] getAppConfigurationEntry(String 
name) {
+                ImmutableMap.Builder<String, String> builder = 
ImmutableMap.<String, String>builder()
+                        .put("doNotPrompt", "true")
+                        .put("isInitiator", "true")
+                        .put("useKeyTab", "true")
+                        .put("storeKey", "true")
+                        .put("keyTab", keytab)
+                        .put("principal", principal);
+                if (printDebugLog) {
+                    builder.put("debug", "true");
+                }
+                Map<String, String> options = builder.build();
+                return new AppConfigurationEntry[]{
+                    new AppConfigurationEntry(
+                        "com.sun.security.auth.module.Krb5LoginModule",
+                        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                        options)};
+            }
+        };
+    }
+}
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java
new file mode 100644
index 00000000000..fbe0d0aba7d
--- /dev/null
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopSimpleAuthenticator.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.security.authentication;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class HadoopSimpleAuthenticator implements HadoopAuthenticator {
+    private static final Logger LOG = 
LogManager.getLogger(HadoopSimpleAuthenticator.class);
+    private final UserGroupInformation ugi;
+
+    public HadoopSimpleAuthenticator(SimpleAuthenticationConfig config) {
+        String hadoopUserName = config.getUsername();
+        if (hadoopUserName == null) {
+            hadoopUserName = "hadoop";
+            config.setUsername(hadoopUserName);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{} is unset, use default user: hadoop", 
AuthenticationConfig.HADOOP_USER_NAME);
+            }
+        }
+        ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Login by proxy user, hadoop.username: {}", 
hadoopUserName);
+        }
+    }
+
+    @Override
+    public UserGroupInformation getUGI() {
+        return ugi;
+    }
+}
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java
index 1a86b9e327a..d04d772728b 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/HadoopUGI.java
@@ -18,8 +18,6 @@
 package org.apache.doris.common.security.authentication;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -27,6 +25,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
+@Deprecated
 public class HadoopUGI {
     private static final Logger LOG = LogManager.getLogger(HadoopUGI.class);
 
@@ -39,82 +38,30 @@ public class HadoopUGI {
         if (config == null || !config.isValid()) {
             return null;
         }
-        UserGroupInformation ugi;
         if (config instanceof KerberosAuthenticationConfig) {
-            KerberosAuthenticationConfig krbConfig = 
(KerberosAuthenticationConfig) config;
-            Configuration hadoopConf = krbConfig.getConf();
-            
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
"true");
-            
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED,
 "true");
-            UserGroupInformation.setConfiguration(hadoopConf);
-            String principal = krbConfig.getKerberosPrincipal();
             try {
-                //  login hadoop with keytab and try checking TGT
-                ugi = UserGroupInformation.getLoginUser();
-                LOG.debug("Current login user: {}", ugi.getUserName());
-                if (ugi.hasKerberosCredentials() && 
StringUtils.equals(ugi.getUserName(), principal)) {
-                    // if the current user is logged by kerberos and is the 
same user
-                    // just use checkTGTAndReloginFromKeytab because this 
method will only relogin
-                    // when the TGT is expired or is close to expiry
-                    ugi.checkTGTAndReloginFromKeytab();
-                    return ugi;
+                // TODO: remove after iceberg and hudi kerberos test case pass
+                try {
+                    //  login hadoop with keytab and try checking TGT
+                    UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
+                    LOG.debug("Current login user: {}", ugi.getUserName());
+                    String principal = ((KerberosAuthenticationConfig) 
config).getKerberosPrincipal();
+                    if (ugi.hasKerberosCredentials() && 
StringUtils.equals(ugi.getUserName(), principal)) {
+                        // if the current user is logged by kerberos and is 
the same user
+                        // just use checkTGTAndReloginFromKeytab because this 
method will only relogin
+                        // when the TGT is expired or is close to expiry
+                        ugi.checkTGTAndReloginFromKeytab();
+                        return ugi;
+                    }
+                } catch (IOException e) {
+                    LOG.warn("A SecurityException occurs with kerberos, do 
login immediately.", e);
                 }
-            } catch (IOException e) {
-                LOG.warn("A SecurityException occurs with kerberos, do login 
immediately.", e);
-            }
-            try {
-                ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
krbConfig.getKerberosKeytab());
-                UserGroupInformation.setLoginUser(ugi);
-                LOG.debug("Login by kerberos authentication with principal: 
{}", principal);
-                return ugi;
+                return new 
HadoopKerberosAuthenticator((KerberosAuthenticationConfig) config).getUGI();
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
         } else {
-            String hadoopUserName = ((SimpleAuthenticationConfig) 
config).getUsername();
-            if (hadoopUserName == null) {
-                hadoopUserName = "hadoop";
-                ((SimpleAuthenticationConfig) 
config).setUsername(hadoopUserName);
-                LOG.debug(AuthenticationConfig.HADOOP_USER_NAME + " is unset, 
use default user: hadoop");
-            }
-
-            try {
-                ugi = UserGroupInformation.getLoginUser();
-                if (ugi.getUserName().equals(hadoopUserName)) {
-                    return ugi;
-                }
-            } catch (IOException e) {
-                LOG.warn("A SecurityException occurs with simple, do login 
immediately.", e);
-            }
-
-            ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
-            UserGroupInformation.setLoginUser(ugi);
-            LOG.debug("Login by proxy user, hadoop.username: {}", 
hadoopUserName);
-            return ugi;
-        }
-    }
-
-    /**
-     * use for HMSExternalCatalog to login
-     * @param config auth config
-     */
-    public static void tryKrbLogin(String catalogName, AuthenticationConfig 
config) {
-        if (config instanceof KerberosAuthenticationConfig) {
-            KerberosAuthenticationConfig krbConfig = 
(KerberosAuthenticationConfig) config;
-            try {
-                Configuration hadoopConf = krbConfig.getConf();
-                
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
"true");
-                
hadoopConf.set(CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED,
 "true");
-                UserGroupInformation.setConfiguration(hadoopConf);
-                /**
-                 * Because metastore client is created by using
-                 * {@link 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient#getProxy}
-                 * it will relogin when TGT is expired, so we don't need to 
relogin manually.
-                 */
-                
UserGroupInformation.loginUserFromKeytab(krbConfig.getKerberosPrincipal(),
-                        krbConfig.getKerberosKeytab());
-            } catch (IOException e) {
-                throw new RuntimeException("login with kerberos auth failed 
for catalog: " + catalogName, e);
-            }
+            return new HadoopSimpleAuthenticator((SimpleAuthenticationConfig) 
config).getUGI();
         }
     }
 
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java
similarity index 54%
copy from 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
copy to 
fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java
index 722cd0352b7..10e42f4bc67 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/ImpersonatingHadoopAuthenticator.java
@@ -17,18 +17,27 @@
 
 package org.apache.doris.common.security.authentication;
 
-import lombok.Data;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 
-@Data
-public class KerberosAuthenticationConfig extends AuthenticationConfig {
-    private String kerberosPrincipal;
-    private String kerberosKeytab;
-    private Configuration conf;
+import java.io.IOException;
+import java.util.Objects;
+
+public class ImpersonatingHadoopAuthenticator implements HadoopAuthenticator {
+
+    private final HadoopAuthenticator delegate;
+    private final String username;
+    private UserGroupInformation ugi;
+
+    public ImpersonatingHadoopAuthenticator(HadoopAuthenticator delegate, 
String username) {
+        this.delegate = Objects.requireNonNull(delegate);
+        this.username = Objects.requireNonNull(username);
+    }
 
     @Override
-    public boolean isValid() {
-        return StringUtils.isNotEmpty(kerberosPrincipal) && 
StringUtils.isNotEmpty(kerberosKeytab);
+    public synchronized UserGroupInformation getUGI() throws IOException {
+        if (ugi == null) {
+            ugi = UserGroupInformation.createProxyUser(username, 
delegate.getUGI());
+        }
+        return ugi;
     }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
index 722cd0352b7..adf76274386 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/KerberosAuthenticationConfig.java
@@ -18,14 +18,17 @@
 package org.apache.doris.common.security.authentication;
 
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
+@EqualsAndHashCode(callSuper = true)
 @Data
 public class KerberosAuthenticationConfig extends AuthenticationConfig {
     private String kerberosPrincipal;
     private String kerberosKeytab;
     private Configuration conf;
+    private boolean printDebugLog;
 
     @Override
     public boolean isValid() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
index 1ac77972053..a5e0eefb348 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java
@@ -18,6 +18,7 @@
 package org.apache.doris.datasource.hive;
 
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.DatabaseMetadata;
 import org.apache.doris.datasource.TableMetadata;
 import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@@ -113,6 +114,10 @@ public interface HMSCachedClient {
 
     void dropPartition(String dbName, String tableName, List<String> 
partitionValues, boolean deleteData);
 
+    default void setHadoopAuthenticator(HadoopAuthenticator 
hadoopAuthenticator) {
+        // Ignored by default
+    }
+
     /**
      * close the connection, eg, to hms
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 91192b63c24..a22eacaf1e4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -24,7 +24,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopUGI;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalDatabase;
@@ -40,7 +40,9 @@ import org.apache.doris.fs.FileSystemProviderImpl;
 import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.transaction.TransactionManagerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+import lombok.Getter;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.logging.log4j.LogManager;
@@ -68,7 +70,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
 
     private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
     private ThreadPoolExecutor fileSystemExecutor;
+    @Getter
+    private HadoopAuthenticator authenticator;
 
+    @VisibleForTesting
     public HMSExternalCatalog() {
         catalogProperty = new CatalogProperty(null, null);
     }
@@ -81,6 +86,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
         super(catalogId, name, InitCatalogLog.Type.HMS, comment);
         props = PropertyConverter.convertToMetaProperties(props);
         catalogProperty = new CatalogProperty(resource, props);
+        AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
+        authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
     }
 
     @Override
@@ -148,9 +155,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
             }
             
hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
                     
String.valueOf(Config.hive_metastore_client_timeout_second));
-            HadoopUGI.tryKrbLogin(this.getName(), 
AuthenticationConfig.getKerberosConfig(hiveConf,
-                    AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
-                    AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
         }
         HiveMetadataOps hiveOps = 
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
         FileSystemProvider fileSystemProvider = new 
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 9f9fbebe28d..dcfc6d1ad33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -33,6 +33,7 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
@@ -61,11 +62,14 @@ public class HiveMetadataOps implements ExternalMetadataOps 
{
     private static final int MIN_CLIENT_POOL_SIZE = 8;
     private final HMSCachedClient client;
     private final HMSExternalCatalog catalog;
+    private HadoopAuthenticator hadoopAuthenticator;
 
     public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig 
jdbcClientConfig, HMSExternalCatalog catalog) {
         this(catalog, createCachedClient(hiveConf,
                 Math.max(MIN_CLIENT_POOL_SIZE, 
Config.max_external_cache_loader_thread_pool_size),
                 jdbcClientConfig));
+        hadoopAuthenticator = catalog.getAuthenticator();
+        client.setHadoopAuthenticator(hadoopAuthenticator);
     }
 
     @VisibleForTesting
@@ -85,7 +89,8 @@ public class HiveMetadataOps implements ExternalMetadataOps {
     private static HMSCachedClient createCachedClient(HiveConf hiveConf, int 
thriftClientPoolSize,
             JdbcClientConfig jdbcClientConfig) {
         if (hiveConf != null) {
-            return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize);
+            ThriftHMSCachedClient client = new ThriftHMSCachedClient(hiveConf, 
thriftClientPoolSize);
+            return client;
         }
         Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and 
jdbcClientConfig are both null");
         String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
index 55d8ffc2e02..7632101d7ff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.DatabaseMetadata;
 import org.apache.doris.datasource.TableMetadata;
 import 
org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@@ -92,6 +93,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient 
{
     private boolean isClosed = false;
     private final int poolSize;
     private final HiveConf hiveConf;
+    private HadoopAuthenticator hadoopAuthenticator;
 
     public ThriftHMSCachedClient(HiveConf hiveConf, int poolSize) {
         Preconditions.checkArgument(poolSize > 0, poolSize);
@@ -104,6 +106,10 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
         this.isClosed = false;
     }
 
+    public void setHadoopAuthenticator(HadoopAuthenticator 
hadoopAuthenticator) {
+        this.hadoopAuthenticator = hadoopAuthenticator;
+    }
+
     @Override
     public void close() {
         synchronized (clientPool) {
@@ -678,7 +684,11 @@ public class ThriftHMSCachedClient implements 
HMSCachedClient {
     }
 
     private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) {
-        return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action);
+        try {
+            return hadoopAuthenticator.doAs(action);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
index 68de3a8fdef..3cb8a036c2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
@@ -56,7 +57,7 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem {
         try {
             org.apache.hadoop.fs.FileSystem fileSystem = 
nativeFileSystem(remotePath);
             Path locatedPath = new Path(remotePath);
-            RemoteIterator<LocatedFileStatus> locatedFiles = 
fileSystem.listFiles(locatedPath, recursive);
+            RemoteIterator<LocatedFileStatus> locatedFiles = 
getLocatedFiles(recursive, fileSystem, locatedPath);
             while (locatedFiles.hasNext()) {
                 LocatedFileStatus fileStatus = locatedFiles.next();
                 RemoteFile location = new RemoteFile(
@@ -72,11 +73,16 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem {
         return Status.OK;
     }
 
+    protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean 
recursive,
+                FileSystem fileSystem, Path locatedPath) throws IOException {
+        return fileSystem.listFiles(locatedPath, recursive);
+    }
+
     @Override
     public Status listDirectories(String remotePath, Set<String> result) {
         try {
             FileSystem fileSystem = nativeFileSystem(remotePath);
-            FileStatus[] fileStatuses = fileSystem.listStatus(new 
Path(remotePath));
+            FileStatus[] fileStatuses = getFileStatuses(remotePath, 
fileSystem);
             result.addAll(
                     Arrays.stream(fileStatuses)
                             .filter(FileStatus::isDirectory)
@@ -88,6 +94,10 @@ public abstract class RemoteFileSystem extends 
PersistentFileSystem {
         return Status.OK;
     }
 
+    protected FileStatus[] getFileStatuses(String remotePath, FileSystem 
fileSystem) throws IOException {
+        return fileSystem.listStatus(new Path(remotePath));
+    }
+
     @Override
     public Status renameDir(String origFilePath,
                             String destFilePath,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 944051e8741..59fbd73bda7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -21,7 +21,7 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopUGI;
+import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.common.util.URI;
 import org.apache.doris.fs.operations.HDFSFileOperations;
 import org.apache.doris.fs.operations.HDFSOpParams;
@@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -58,8 +60,8 @@ public class DFSFileSystem extends RemoteFileSystem {
 
     public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = 
"ipc.client.fallback-to-simple-auth-allowed";
     private static final Logger LOG = 
LogManager.getLogger(DFSFileSystem.class);
-
     private HDFSFileOperations operations = null;
+    private HadoopAuthenticator authenticator = null;
 
     public DFSFileSystem(Map<String, String> properties) {
         this(StorageBackend.StorageType.HDFS, properties);
@@ -80,21 +82,35 @@ public class DFSFileSystem extends RemoteFileSystem {
                     for (Map.Entry<String, String> propEntry : 
properties.entrySet()) {
                         conf.set(propEntry.getKey(), propEntry.getValue());
                     }
-
-                    dfsFileSystem = 
HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(conf), () -> {
-                        try {
-                            return FileSystem.get(new 
Path(remotePath).toUri(), conf);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
+                    AuthenticationConfig authConfig = 
AuthenticationConfig.getKerberosConfig(conf);
+                    authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(authConfig);
+                    try {
+                        dfsFileSystem = authenticator.doAs(() -> {
+                            try {
+                                return FileSystem.get(new 
Path(remotePath).toUri(), conf);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+                    } catch (Exception e) {
+                        throw new UserException(e);
+                    }
+                    operations = new HDFSFileOperations(dfsFileSystem);
                 }
             }
         }
-        operations = new HDFSFileOperations(dfsFileSystem);
         return dfsFileSystem;
     }
 
+    protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean 
recursive,
+                FileSystem fileSystem, Path locatedPath) throws IOException {
+        return authenticator.doAs(() -> fileSystem.listFiles(locatedPath, 
recursive));
+    }
+
+    protected FileStatus[] getFileStatuses(String remotePath, FileSystem 
fileSystem) throws IOException {
+        return authenticator.doAs(() -> fileSystem.listStatus(new 
Path(remotePath)));
+    }
+
     public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) {
         Configuration hdfsConf = new HdfsConfiguration();
         if (fallbackToSimpleAuth) {
@@ -266,7 +282,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             URI pathUri = URI.create(remotePath);
             Path inputFilePath = new Path(pathUri.getPath());
             FileSystem fileSystem = nativeFileSystem(remotePath);
-            boolean isPathExist = fileSystem.exists(inputFilePath);
+            boolean isPathExist = authenticator.doAs(() -> 
fileSystem.exists(inputFilePath));
             if (!isPathExist) {
                 return new Status(Status.ErrCode.NOT_FOUND, "remote path does 
not exist: " + remotePath);
             }
@@ -381,7 +397,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             FileSystem fileSystem = nativeFileSystem(destPath);
             Path srcfilePath = new Path(srcPathUri.getPath());
             Path destfilePath = new Path(destPathUri.getPath());
-            boolean isRenameSuccess = fileSystem.rename(srcfilePath, 
destfilePath);
+            boolean isRenameSuccess = authenticator.doAs(() -> 
fileSystem.rename(srcfilePath, destfilePath));
             if (!isRenameSuccess) {
                 return new Status(Status.ErrCode.COMMON_ERROR, "failed to 
rename " + srcPath + " to " + destPath);
             }
@@ -402,7 +418,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             URI pathUri = URI.create(remotePath);
             Path inputFilePath = new Path(pathUri.getPath());
             FileSystem fileSystem = nativeFileSystem(remotePath);
-            fileSystem.delete(inputFilePath, true);
+            authenticator.doAs(() -> fileSystem.delete(inputFilePath, true));
         } catch (UserException e) {
             return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
         } catch (IOException e) {
@@ -428,7 +444,7 @@ public class DFSFileSystem extends RemoteFileSystem {
             URI pathUri = URI.create(remotePath);
             FileSystem fileSystem = nativeFileSystem(remotePath);
             Path pathPattern = new Path(pathUri.getPath());
-            FileStatus[] files = fileSystem.globStatus(pathPattern);
+            FileStatus[] files = authenticator.doAs(() -> 
fileSystem.globStatus(pathPattern));
             if (files == null) {
                 LOG.info("no files in path " + remotePath);
                 return Status.OK;
@@ -455,7 +471,7 @@ public class DFSFileSystem extends RemoteFileSystem {
     public Status makeDir(String remotePath) {
         try {
             FileSystem fileSystem = nativeFileSystem(remotePath);
-            if (!fileSystem.mkdirs(new Path(remotePath))) {
+            if (!authenticator.doAs(() -> fileSystem.mkdirs(new 
Path(remotePath)))) {
                 LOG.warn("failed to make dir for " + remotePath);
                 return new Status(Status.ErrCode.COMMON_ERROR, "failed to make 
dir for " + remotePath);
             }
diff --git 
a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy
 
b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy
index a3b39d1221a..7e7f276236a 100644
--- 
a/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy
+++ 
b/regression-test/suites/external_table_p0/kerberos/test_two_hive_kerberos.groovy
@@ -1,3 +1,5 @@
+import groovyjarjarantlr4.v4.codegen.model.ExceptionClause
+
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,6 +17,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.junit.Assert;
+
 suite("test_two_hive_kerberos", 
"p0,external,kerberos,external_docker,external_docker_kerberos") {
     String enabled = context.config.otherConfigs.get("enableKerberosTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
@@ -66,7 +70,36 @@ suite("test_two_hive_kerberos", 
"p0,external,kerberos,external_docker,external_d
         sql """ use test_krb_hive_db """
         order_qt_q02 """ select * from test_krb_hive_db.test_krb_hive_tbl """
 
+        // 3. multi thread test
+        Thread thread1 = new Thread(() -> {
+            try {
+                for (int i = 0; i < 1000; i++) {
+                    sql """ select * from 
${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """
+                }
+            } catch (Exception e) {
+                log.info(e.getMessage())
+                Assert.fail();
+            }
+        })
+
+        Thread thread2 = new Thread(() -> {
+            try {
+                for (int i = 0; i < 1000; i++) {
+                    sql """ select * from 
other_${hms_catalog_name}.test_krb_hive_db.test_krb_hive_tbl """
+                }
+            } catch (Exception e) {
+                log.info(e.getMessage())
+                Assert.fail();
+            }
+        })
+        sleep(5000L)
+        thread1.start()
+        thread2.start()
+
+        thread1.join()
+        thread2.join()
         sql """drop catalog ${hms_catalog_name};"""
         sql """drop catalog other_${hms_catalog_name};"""
+        // TODO: add tvf case
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to