This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b4af6713478 branch-2.1: [Fix](catalog)Fixes query failures for Paimon
tables stored in Kerberized HDFS #47192 (#47694)
b4af6713478 is described below
commit b4af671347847f0fce09a2715967320ccf0301ee
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 12 09:16:30 2025 +0800
branch-2.1: [Fix](catalog)Fixes query failures for Paimon tables stored in
Kerberized HDFS #47192 (#47694)
Cherry-picked from #47192
Co-authored-by: Calvin Kirs <[email protected]>
---
.../apache/doris/hudi/HadoopHudiJniScanner.java | 55 ++++++++------
fe/be-java-extensions/paimon-scanner/pom.xml | 6 --
.../org/apache/doris/paimon/PaimonJniScanner.java | 30 ++++++--
.../authentication/AuthenticationConfig.java | 17 ++++-
.../PreExecutionAuthenticatorCache.java | 87 ++++++++++++++++++++++
5 files changed, 159 insertions(+), 36 deletions(-)
diff --git
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
index f2b38815a36..f163be11aa2 100644
---
a/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
+++
b/fe/be-java-extensions/hadoop-hudi-scanner/src/main/java/org/apache/doris/hudi/HadoopHudiJniScanner.java
@@ -20,6 +20,8 @@ package org.apache.doris.hudi;
import org.apache.doris.common.classloader.ThreadClassLoaderContext;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner {
private final int fetchSize;
private final ClassLoader classLoader;
+ private final PreExecutionAuthenticator preExecutionAuthenticator;
+
public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
this.basePath = params.get("base_path");
this.dataFilePath = params.get("data_file_path");
@@ -120,6 +124,7 @@ public class HadoopHudiJniScanner extends JniScanner {
LOG.debug("get hudi params {}: {}", entry.getKey(),
entry.getValue());
}
}
+ this.preExecutionAuthenticator =
PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps);
ZoneId zoneId;
if (Strings.isNullOrEmpty(params.get("time_zone"))) {
@@ -135,10 +140,14 @@ public class HadoopHudiJniScanner extends JniScanner {
@Override
public void open() throws IOException {
try (ThreadClassLoaderContext ignored = new
ThreadClassLoaderContext(classLoader)) {
- initRequiredColumnsAndTypes();
- initTableInfo(requiredTypes, requiredFields, fetchSize);
- Properties properties = getReaderProperties();
- initReader(properties);
+ preExecutionAuthenticator.execute(() -> {
+ initRequiredColumnsAndTypes();
+ initTableInfo(requiredTypes, requiredFields, fetchSize);
+ Properties properties = getReaderProperties();
+ initReader(properties);
+ return null;
+ });
+
} catch (Exception e) {
close();
LOG.warn("failed to open hadoop hudi jni scanner", e);
@@ -149,25 +158,27 @@ public class HadoopHudiJniScanner extends JniScanner {
@Override
public int getNext() throws IOException {
try (ThreadClassLoaderContext ignored = new
ThreadClassLoaderContext(classLoader)) {
- NullWritable key = reader.createKey();
- ArrayWritable value = reader.createValue();
- int numRows = 0;
- for (; numRows < fetchSize; numRows++) {
- if (!reader.next(key, value)) {
- break;
+ return preExecutionAuthenticator.execute(() -> {
+ NullWritable key = reader.createKey();
+ ArrayWritable value = reader.createValue();
+ int numRows = 0;
+ for (; numRows < fetchSize; numRows++) {
+ if (!reader.next(key, value)) {
+ break;
+ }
+ Object rowData = deserializer.deserialize(value);
+ for (int i = 0; i < fields.length; i++) {
+ Object fieldData =
rowInspector.getStructFieldData(rowData, structFields[i]);
+ columnValue.setRow(fieldData);
+ // LOG.info("rows: {}, column: {}, col name: {}, col
type: {}, inspector: {}",
+ // numRows, i, types[i].getName(),
types[i].getType().name(),
+ // fieldInspectors[i].getTypeName());
+ columnValue.setField(types[i], fieldInspectors[i]);
+ appendData(i, columnValue);
+ }
}
- Object rowData = deserializer.deserialize(value);
- for (int i = 0; i < fields.length; i++) {
- Object fieldData =
rowInspector.getStructFieldData(rowData, structFields[i]);
- columnValue.setRow(fieldData);
- // LOG.info("rows: {}, column: {}, col name: {}, col type:
{}, inspector: {}",
- // numRows, i, types[i].getName(),
types[i].getType().name(),
- // fieldInspectors[i].getTypeName());
- columnValue.setField(types[i], fieldInspectors[i]);
- appendData(i, columnValue);
- }
- }
- return numRows;
+ return numRows;
+ });
} catch (Exception e) {
close();
LOG.warn("failed to get next in hadoop hudi jni scanner", e);
diff --git a/fe/be-java-extensions/paimon-scanner/pom.xml
b/fe/be-java-extensions/paimon-scanner/pom.xml
index 0b513691303..3b4eb7acfe3 100644
--- a/fe/be-java-extensions/paimon-scanner/pom.xml
+++ b/fe/be-java-extensions/paimon-scanner/pom.xml
@@ -39,12 +39,6 @@ under the License.
<groupId>org.apache.doris</groupId>
<artifactId>java-common</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <artifactId>fe-common</artifactId>
- <groupId>org.apache.doris</groupId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
diff --git
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index 6ffd5f1ad90..e6c04a0a2f7 100644
---
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -20,6 +20,8 @@ package org.apache.doris.paimon;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.TableSchema;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
import org.apache.doris.paimon.PaimonTableCache.TableExt;
@@ -74,6 +76,7 @@ public class PaimonJniScanner extends JniScanner {
private long lastUpdateTime;
private RecordReader.RecordIterator<InternalRow> recordIterator = null;
private final ClassLoader classLoader;
+ private PreExecutionAuthenticator preExecutionAuthenticator;
public PaimonJniScanner(int batchSize, Map<String, String> params) {
this.classLoader = this.getClass().getClassLoader();
@@ -104,6 +107,7 @@ public class PaimonJniScanner extends JniScanner {
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
.collect(Collectors
.toMap(kv1 ->
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
+ this.preExecutionAuthenticator =
PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
}
@Override
@@ -114,12 +118,16 @@ public class PaimonJniScanner extends JniScanner {
//
`Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)`
// so we need to provide a classloader, otherwise it will cause
NPE.
Thread.currentThread().setContextClassLoader(classLoader);
- initTable();
- initReader();
+ preExecutionAuthenticator.execute(() -> {
+ initTable();
+ initReader();
+ return null;
+ });
resetDatetimeV2Precision();
+
} catch (Throwable e) {
LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e);
- throw e;
+ throw new RuntimeException(e);
}
}
@@ -137,7 +145,7 @@ public class PaimonJniScanner extends JniScanner {
readBuilder.withFilter(getPredicates());
reader =
readBuilder.newRead().executeFilter().createReader(getSplit());
paimonDataTypeList =
- Arrays.stream(projected).mapToObj(i ->
table.rowType().getTypeAt(i)).collect(Collectors.toList());
+ Arrays.stream(projected).mapToObj(i ->
table.rowType().getTypeAt(i)).collect(Collectors.toList());
}
private int[] getProjected() {
@@ -183,8 +191,7 @@ public class PaimonJniScanner extends JniScanner {
}
}
- @Override
- protected int getNext() throws IOException {
+ private int readAndProcessNextBatch() throws IOException {
int rows = 0;
try {
if (recordIterator == null) {
@@ -210,13 +217,22 @@ public class PaimonJniScanner extends JniScanner {
} catch (Exception e) {
close();
LOG.warn("Failed to get the next batch of paimon. "
- + "split: {}, requiredFieldNames: {}, paimonAllFieldNames:
{}, dataType: {}",
+ + "split: {}, requiredFieldNames: {},
paimonAllFieldNames: {}, dataType: {}",
getSplit(), params.get("required_fields"),
paimonAllFieldNames, paimonDataTypeList, e);
throw new IOException(e);
}
return rows;
}
+ @Override
+ protected int getNext() {
+ try {
+ return
preExecutionAuthenticator.execute(this::readAndProcessNextBatch);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
protected TableSchema parseTableSchema() throws
UnsupportedOperationException {
// do nothing
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
index b580f9ecbe0..2fa8d09b0d7 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/AuthenticationConfig.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Map;
+
public abstract class AuthenticationConfig {
private static final Logger LOG =
LogManager.getLogger(AuthenticationConfig.class);
public static String HADOOP_USER_NAME = "hadoop.username";
@@ -31,12 +33,24 @@ public abstract class AuthenticationConfig {
public static String HIVE_KERBEROS_PRINCIPAL =
"hive.metastore.kerberos.principal";
public static String HIVE_KERBEROS_KEYTAB =
"hive.metastore.kerberos.keytab.file";
public static String DORIS_KRB5_DEBUG = "doris.krb5.debug";
+ private static final String DEFAULT_HADOOP_USERNAME = "hadoop";
/**
* @return true if the config is valid, otherwise false.
*/
public abstract boolean isValid();
+ protected static String generalAuthenticationConfigKey(Map<String, String>
conf) {
+ String authentication =
conf.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ null);
+ if (AuthType.KERBEROS.getDesc().equals(authentication)) {
+ return conf.get(HADOOP_KERBEROS_PRINCIPAL) + "-" +
conf.get(HADOOP_KERBEROS_KEYTAB) + "-"
+ + conf.getOrDefault(DORIS_KRB5_DEBUG, "false");
+ } else {
+ return conf.getOrDefault(HADOOP_USER_NAME,
DEFAULT_HADOOP_USERNAME);
+ }
+ }
+
/**
* get kerberos config from hadoop conf
* @param conf config
@@ -90,7 +104,8 @@ public abstract class AuthenticationConfig {
private static AuthenticationConfig
createSimpleAuthenticationConfig(Configuration conf) {
// AuthType.SIMPLE
SimpleAuthenticationConfig simpleAuthenticationConfig = new
SimpleAuthenticationConfig();
- simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME));
+ String hadoopUserName = conf.get(HADOOP_USER_NAME,
DEFAULT_HADOOP_USERNAME);
+ simpleAuthenticationConfig.setUsername(hadoopUserName);
return simpleAuthenticationConfig;
}
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java
new file mode 100644
index 00000000000..5b0d1cb70ff
--- /dev/null
+++
b/fe/fe-common/src/main/java/org/apache/doris/common/security/authentication/PreExecutionAuthenticatorCache.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.security.authentication;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A cache class for storing and retrieving PreExecutionAuthenticator
instances based on Hadoop configurations.
+ * This class caches PreExecutionAuthenticator objects to avoid recreating
them for the same Hadoop configuration.
+ * It uses a Least Recently Used (LRU) cache, where the least recently used
entries are removed when the cache exceeds
+ * the maximum size (MAX_CACHE_SIZE).
+ * <p>
+ * The purpose of this class is to ensure that for identical Hadoop
configurations (key-value pairs),
+ * only one PreExecutionAuthenticator instance is created and reused,
optimizing performance by reducing
+ * redundant instantiations.
+ */
+public class PreExecutionAuthenticatorCache {
+ private static final Logger LOG =
LogManager.getLogger(PreExecutionAuthenticatorCache.class);
+ private static final int MAX_CACHE_SIZE = 100;
+
+ private static final Cache<String, PreExecutionAuthenticator>
preExecutionAuthenticatorCache =
+ CacheBuilder.newBuilder()
+ .maximumSize(MAX_CACHE_SIZE)
+ .expireAfterAccess(60 * 24, TimeUnit.MINUTES)
+ .build();
+
+ /**
+ * Retrieves a PreExecutionAuthenticator instance from the cache or
creates a new one if it doesn't exist.
+ * This method first checks if the configuration is already cached. If
not, it computes a new instance and
+ * caches it for future use.
+ *
+ * @param hadoopConfig The Hadoop configuration (key-value pairs)
+ * @return A PreExecutionAuthenticator instance for the given configuration
+ */
+ public static PreExecutionAuthenticator getAuthenticator(Map<String,
String> hadoopConfig) {
+ String authenticatorCacheKey =
AuthenticationConfig.generalAuthenticationConfigKey(hadoopConfig);
+ PreExecutionAuthenticator authenticator;
+ try {
+ authenticator =
preExecutionAuthenticatorCache.get(authenticatorCacheKey,
+ () -> createAuthenticator(hadoopConfig,
authenticatorCacheKey));
+ } catch (ExecutionException exception) {
+ throw new RuntimeException("Failed to create
PreExecutionAuthenticator for key: " + authenticatorCacheKey,
+ exception);
+ }
+ return authenticator;
+ }
+
+ private static PreExecutionAuthenticator createAuthenticator(Map<String,
String> hadoopConfig,
+ String
authenticatorCacheKey) {
+ Configuration conf = new Configuration();
+ hadoopConfig.forEach(conf::set);
+ PreExecutionAuthenticator preExecutionAuthenticator = new
PreExecutionAuthenticator();
+ AuthenticationConfig authenticationConfig =
AuthenticationConfig.getKerberosConfig(
+ conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
+ AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
+ HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator
+ .getHadoopAuthenticator(authenticationConfig);
+ preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator);
+ LOG.info("Creating new PreExecutionAuthenticator for configuration,
Cache key: {}",
+ authenticatorCacheKey);
+ return preExecutionAuthenticator;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]