This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new d40abdf896 [dev-1.1.2](cherry-pick) fix bug that hive external table
can not query table created by Tez (#11603)
d40abdf896 is described below
commit d40abdf8965f93fe1ba2806abe68680141daf627
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Aug 9 08:28:34 2022 +0800
[dev-1.1.2](cherry-pick) fix bug that hive external table can not query
table created by Tez (#11603)
---
.../doris/catalog/HiveMetaStoreClientHelper.java | 207 ++++++++++++---------
1 file changed, 123 insertions(+), 84 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index b49a7e5d7d..0d5bcf9671 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -36,10 +36,13 @@ import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExprOpcode;
+import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -61,8 +64,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
-import com.google.common.base.Strings;
-
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -70,6 +71,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Stack;
/**
@@ -162,7 +164,8 @@ public class HiveMetaStoreClientHelper {
}
/**
- * Get data files of partitions in hive table, filter by partition
predicate
+ * Get data files of partitions in hive table, filter by partition
predicate.
+ *
* @param hiveTable
* @param hivePartitionPredicate
* @param fileStatuses
@@ -171,34 +174,134 @@ public class HiveMetaStoreClientHelper {
* @throws DdlException
*/
public static String getHiveDataFiles(HiveTable hiveTable,
ExprNodeGenericFuncDesc hivePartitionPredicate,
- List<TBrokerFileStatus>
fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type) throws
DdlException {
- HiveMetaStoreClient client =
getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
-
+ List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl,
StorageBackend.StorageType type)
+ throws DdlException {
+ boolean onS3 = type.equals(StorageBackend.StorageType.S3);
+ Map<String, String> properties = hiveTable.getHiveProperties();
+ Configuration configuration = getConfiguration(properties, onS3);
+ boolean isSecurityEnabled = isSecurityEnabled(properties);
+ String metaStoreUris =
hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
+
List<RemoteIterator<LocatedFileStatus>> remoteIterators;
- Boolean onS3 = type.equals(StorageBackend.StorageType.S3);
if (remoteHiveTbl.getPartitionKeys().size() > 0) {
// hive partitioned table, get file iterator from table partition
sd info
- List<Partition> hivePartitions = new ArrayList<>();
- try {
- client.listPartitionsByExpr(hiveTable.getHiveDb(),
hiveTable.getHiveTable(),
-
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null,
(short) -1, hivePartitions);
- } catch (TException e) {
- LOG.warn("Hive metastore thrift exception: {}",
e.getMessage());
- throw new DdlException("Connect hive metastore failed.");
- } finally {
- client.close();
- }
- remoteIterators = getRemoteIterator(hivePartitions,
hiveTable.getHiveProperties(), onS3);
+ List<Partition> hivePartitions = getHivePartitions(metaStoreUris,
remoteHiveTbl, hivePartitionPredicate);
+ remoteIterators = getRemoteIterator(hivePartitions, configuration,
isSecurityEnabled, properties, onS3);
} else {
// hive non-partitioned table, get file iterator from table sd info
- remoteIterators = getRemoteIterator(remoteHiveTbl,
hiveTable.getHiveProperties(), onS3);
+ remoteIterators = getRemoteIterator(remoteHiveTbl, configuration,
isSecurityEnabled, properties, onS3);
+ }
+ return getAllFileStatus(fileStatuses, remoteIterators, configuration,
isSecurityEnabled, properties, onS3);
+ }
+
+ /**
+ * list partitions from hiveMetaStore.
+ *
+ * @param metaStoreUris hiveMetaStore uris
+ * @param remoteHiveTbl Hive table
+ * @param hivePartitionPredicate filter when list partitions
+ * @return a list of hive partitions
+ * @throws DdlException when connect hiveMetaStore failed.
+ */
+ public static List<Partition> getHivePartitions(String metaStoreUris,
Table remoteHiveTbl,
+ ExprNodeGenericFuncDesc hivePartitionPredicate) throws
DdlException {
+ List<Partition> hivePartitions = new ArrayList<>();
+ HiveMetaStoreClient client = getClient(metaStoreUris);
+ try {
+ client.listPartitionsByExpr(remoteHiveTbl.getDbName(),
remoteHiveTbl.getTableName(),
+
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null,
(short) -1,
+ hivePartitions);
+ } catch (TException e) {
+ LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
+ throw new DdlException("Connect hive metastore failed.");
+ } finally {
+ client.close();
+ }
+ return hivePartitions;
+ }
+
+ // create Configuration for the given properties
+ private static Configuration getConfiguration(Map<String, String>
properties, boolean onS3) {
+ Configuration configuration = new Configuration(false);
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ }
+ if (onS3) {
+ setS3Configuration(configuration, properties);
+ }
+ return configuration;
+ }
+
+ // return true if it is kerberos
+ private static boolean isSecurityEnabled(Map<String, String> properties) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if
(entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) &&
entry.getValue()
+ .equals(AuthType.KERBEROS.getDesc())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Get remote iterators for given partitions
+ private static List<RemoteIterator<LocatedFileStatus>>
getRemoteIterator(List<Partition> partitions,
+ Configuration configuration, boolean isSecurityEnabled,
Map<String, String> properties, boolean onS3)
+ throws DdlException {
+ List<RemoteIterator<LocatedFileStatus>> allIterators = new
ArrayList<>();
+ for (Partition p : partitions) {
+ String location = p.getSd().getLocation();
+ Path path = new Path(location);
+ allIterators.addAll(getRemoteIterator(path, configuration,
properties, isSecurityEnabled));
+ }
+ return allIterators;
+ }
+
+ // Get remote iterators for given table
+ private static List<RemoteIterator<LocatedFileStatus>>
getRemoteIterator(Table table, Configuration configuration,
+ boolean isSecurityEnabled, Map<String, String> properties, boolean
onS3) throws DdlException {
+ String location = table.getSd().getLocation();
+ Path path = new Path(location);
+ return getRemoteIterator(path, configuration, properties,
isSecurityEnabled);
+ }
+
+ // Get remote iterators for given Path
+ private static List<RemoteIterator<LocatedFileStatus>>
getRemoteIterator(org.apache.hadoop.fs.Path path,
+ Configuration conf, Map<String, String> properties, boolean
isSecurityEnabled) throws DdlException {
+ List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
+ try {
+ if (isSecurityEnabled) {
+ UserGroupInformation.setConfiguration(conf);
+ // login user from keytab
+
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+ properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+ }
+ FileSystem fileSystem = path.getFileSystem(conf);
+ iterators.add(fileSystem.listLocatedStatus(path));
+ } catch (IOException e) {
+ LOG.warn("Get HDFS file remote iterator failed. {}" +
e.getMessage());
+ throw new DdlException("Get HDFS file remote iterator failed.
Error: " + e.getMessage());
}
+ return iterators;
+ }
+ private static String getAllFileStatus(List<TBrokerFileStatus>
fileStatuses,
+ List<RemoteIterator<LocatedFileStatus>> remoteIterators,
Configuration configuration,
+ boolean isSecurityEnabled, Map<String, String> properties, boolean
onS3) throws DdlException {
String hdfsUrl = "";
- for (RemoteIterator<LocatedFileStatus> iterator : remoteIterators) {
+ Queue<RemoteIterator<LocatedFileStatus>> queue =
Queues.newArrayDeque(remoteIterators);
+ while (queue.peek() != null) {
+ RemoteIterator<LocatedFileStatus> iterator = queue.poll();
try {
while (iterator.hasNext()) {
LocatedFileStatus fileStatus = iterator.next();
+ if (fileStatus.isDirectory()) {
+ // recursive visit the directory to get the file path.
+ queue.addAll(
+ getRemoteIterator(fileStatus.getPath(),
configuration, properties, isSecurityEnabled));
+ continue;
+ }
TBrokerFileStatus brokerFileStatus = new
TBrokerFileStatus();
brokerFileStatus.setIsDir(fileStatus.isDirectory());
brokerFileStatus.setIsSplitable(true);
@@ -226,7 +329,6 @@ public class HiveMetaStoreClientHelper {
throw new DdlException("List HDFS file failed.");
}
}
-
return hdfsUrl;
}
@@ -245,69 +347,6 @@ public class HiveMetaStoreClientHelper {
configuration.set("fs.s3a.attempts.maximum", "2");
}
- private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
- List<Partition> partitions, Map<String, String> properties,
boolean onS3)
- throws DdlException {
- List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
- Configuration configuration = new Configuration(false);
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
- configuration.set(entry.getKey(), entry.getValue());
- }
- }
- if (onS3) {
- setS3Configuration(configuration, properties);
- }
- for (Partition p : partitions) {
- String location = p.getSd().getLocation();
- org.apache.hadoop.fs.Path path = new
org.apache.hadoop.fs.Path(location);
- try {
- FileSystem fileSystem = path.getFileSystem(configuration);
- iterators.add(fileSystem.listLocatedStatus(path));
- } catch (IOException e) {
- LOG.warn("Get HDFS file remote iterator failed. {}",
e.getMessage());
- throw new DdlException("Get HDFS file remote iterator
failed.");
- }
- }
- return iterators;
- }
-
- private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
- Table table, Map<String, String> properties, boolean onS3)
- throws DdlException {
- List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
- Configuration configuration = new Configuration(false);
- boolean isSecurityEnabled = false;
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
- configuration.set(entry.getKey(), entry.getValue());
- }
- if
(entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
- && entry.getValue().equals(AuthType.KERBEROS.getDesc())) {
- isSecurityEnabled = true;
- }
- }
- if (onS3) {
- setS3Configuration(configuration, properties);
- }
- String location = table.getSd().getLocation();
- org.apache.hadoop.fs.Path path = new
org.apache.hadoop.fs.Path(location);
- try {
- if (isSecurityEnabled) {
- UserGroupInformation.setConfiguration(configuration);
- // login user from keytab
-
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
- properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
- }
- FileSystem fileSystem = path.getFileSystem(configuration);
- iterators.add(fileSystem.listLocatedStatus(path));
- } catch (IOException e) {
- LOG.warn("Get HDFS file remote iterator failed. {}" +
e.getMessage());
- throw new DdlException("Get HDFS file remote iterator failed.");
- }
- return iterators;
- }
-
public static List<String> getPartitionNames(HiveTable hiveTable) throws
DdlException {
HiveMetaStoreClient client =
getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
List<String> partitionNames = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]