This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c581855a41 [fix](hive-table) fix bug that hive external table can not query table created by Tez (#11345) c581855a41 is described below commit c581855a410343230bbfe6b259c6241774dc2f37 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Aug 3 09:07:47 2022 +0800 [fix](hive-table) fix bug that hive external table can not query table created by Tez (#11345) * [fix](hive-table) fix bug that hive external table can not query table created by Tez If the hive is created by Tez, the location of the table is a second-level director, eg: /user/hive/warehouse/region_tmp_union_all/ ---/user/hive/warehouse/region_tmp_union_all/1 ---/user/hive/warehouse/region_tmp_union_all/2 We should recursive traverse the directory to get the real files. --- .../doris/catalog/HiveMetaStoreClientHelper.java | 163 ++++++++++++--------- .../planner/external/ExternalFileScanNode.java | 4 +- .../planner/external/ExternalHiveScanProvider.java | 49 ++++--- 3 files changed, 125 insertions(+), 91 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index a364568ed7..ac15875f0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -37,10 +37,12 @@ import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExprOpcode; import com.google.common.base.Strings; +import com.google.common.collect.Queues; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -74,6 +76,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -171,6 +174,7 @@ public class HiveMetaStoreClientHelper { /** * Get data files of partitions in hive table, filter by partition predicate. + * * @param hiveTable * @param hivePartitionPredicate * @param fileStatuses @@ -179,25 +183,108 @@ public class HiveMetaStoreClientHelper { * @throws DdlException */ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate, - List<TBrokerFileStatus> fileStatuses, - Table remoteHiveTbl, StorageBackend.StorageType type) throws DdlException { + List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type) + throws DdlException { + boolean onS3 = type.equals(StorageBackend.StorageType.S3); + Map<String, String> properties = hiveTable.getHiveProperties(); + Configuration configuration = getConfiguration(properties, onS3); + boolean isSecurityEnabled = isSecurityEnabled(properties); + List<RemoteIterator<LocatedFileStatus>> remoteIterators; - Boolean onS3 = type.equals(StorageBackend.StorageType.S3); if (remoteHiveTbl.getPartitionKeys().size() > 0) { String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS); // hive partitioned table, get file iterator from table partition sd info List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate); - remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties(), onS3); + remoteIterators = getRemoteIterator(hivePartitions, configuration, isSecurityEnabled, properties, onS3); } else { // hive non-partitioned table, get file iterator from table sd info - remoteIterators = getRemoteIterator(remoteHiveTbl, hiveTable.getHiveProperties(), onS3); + remoteIterators = getRemoteIterator(remoteHiveTbl, configuration, isSecurityEnabled, properties, onS3); + } + return getAllFileStatus(fileStatuses, remoteIterators, configuration, isSecurityEnabled, properties, onS3); + } + + // create Configuration for the given properties + private static Configuration getConfiguration(Map<String, String> properties, boolean onS3) { + Configuration configuration = new Configuration(false); + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) { + configuration.set(entry.getKey(), entry.getValue()); + } + } + if (onS3) { + setS3Configuration(configuration, properties); + } + return configuration; + } + + // return true if it is kerberos + private static boolean isSecurityEnabled(Map<String, String> properties) { + for (Map.Entry<String, String> entry : properties.entrySet()) { + if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) && entry.getValue() + .equals(AuthType.KERBEROS.getDesc())) { + return true; + } } + return false; + } + // Get remote iterators for given partitions + private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, + Configuration configuration, boolean isSecurityEnabled, Map<String, String> properties, boolean onS3) + throws DdlException { + List<RemoteIterator<LocatedFileStatus>> allIterators = new ArrayList<>(); + for (Partition p : partitions) { + String location = p.getSd().getLocation(); + Path path = new Path(location); + allIterators.addAll(getRemoteIterator(path, configuration, properties, isSecurityEnabled)); + } + return allIterators; + } + + // Get remote iterators for given table + private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Configuration configuration, + boolean isSecurityEnabled, Map<String, String> properties, boolean onS3) throws DdlException { + String location = table.getSd().getLocation(); + Path path = new Path(location); + return getRemoteIterator(path, configuration, properties, isSecurityEnabled); + } + + // Get remote iterators for given Path + private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(org.apache.hadoop.fs.Path path, + Configuration conf, Map<String, String> properties, boolean isSecurityEnabled) throws DdlException { + List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); + try { + if (isSecurityEnabled) { + UserGroupInformation.setConfiguration(conf); + // login user from keytab + UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL), + properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB)); + } + FileSystem fileSystem = path.getFileSystem(conf); + iterators.add(fileSystem.listLocatedStatus(path)); + } catch (IOException e) { + LOG.warn("Get HDFS file remote iterator failed. {}" + e.getMessage()); + throw new DdlException("Get HDFS file remote iterator failed. Error: " + e.getMessage()); + } + return iterators; + } + + private static String getAllFileStatus(List<TBrokerFileStatus> fileStatuses, + List<RemoteIterator<LocatedFileStatus>> remoteIterators, Configuration configuration, + boolean isSecurityEnabled, Map<String, String> properties, boolean onS3) throws DdlException { String hdfsUrl = ""; - for (RemoteIterator<LocatedFileStatus> iterator : remoteIterators) { + Queue<RemoteIterator<LocatedFileStatus>> queue = Queues.newArrayDeque(remoteIterators); + while (queue.peek() != null) { + RemoteIterator<LocatedFileStatus> iterator = queue.poll(); try { while (iterator.hasNext()) { LocatedFileStatus fileStatus = iterator.next(); + if (fileStatus.isDirectory()) { + // recursive visit the directory to get the file path. + queue.addAll( + getRemoteIterator(fileStatus.getPath(), configuration, properties, isSecurityEnabled)); + continue; + } TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus(); brokerFileStatus.setIsDir(fileStatus.isDirectory()); brokerFileStatus.setIsSplitable(true); @@ -226,7 +313,6 @@ public class HiveMetaStoreClientHelper { throw new DdlException("List HDFS file failed. Error: " + e.getMessage()); } } - return hdfsUrl; } @@ -271,69 +357,6 @@ public class HiveMetaStoreClientHelper { configuration.set("fs.s3a.attempts.maximum", "2"); } - private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator( - List<Partition> partitions, Map<String, String> properties, boolean onS3) - throws DdlException { - List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); - Configuration configuration = new Configuration(false); - for (Map.Entry<String, String> entry : properties.entrySet()) { - if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) { - configuration.set(entry.getKey(), entry.getValue()); - } - } - if (onS3) { - setS3Configuration(configuration, properties); - } - for (Partition p : partitions) { - String location = p.getSd().getLocation(); - org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location); - try { - FileSystem fileSystem = path.getFileSystem(configuration); - iterators.add(fileSystem.listLocatedStatus(path)); - } catch (IOException e) { - LOG.warn("Get HDFS file remote iterator failed. {}", e.getMessage()); - throw new DdlException("Get HDFS file remote iterator failed. Error: " + e.getMessage()); - } - } - return iterators; - } - - private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator( - Table table, Map<String, String> properties, boolean onS3) - throws DdlException { - List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); - Configuration configuration = new Configuration(false); - boolean isSecurityEnabled = false; - for (Map.Entry<String, String> entry : properties.entrySet()) { - if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) { - configuration.set(entry.getKey(), entry.getValue()); - } - if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) - && entry.getValue().equals(AuthType.KERBEROS.getDesc())) { - isSecurityEnabled = true; - } - } - if (onS3) { - setS3Configuration(configuration, properties); - } - String location = table.getSd().getLocation(); - org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location); - try { - if (isSecurityEnabled) { - UserGroupInformation.setConfiguration(configuration); - // login user from keytab - UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL), - properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB)); - } - FileSystem fileSystem = path.getFileSystem(configuration); - iterators.add(fileSystem.listLocatedStatus(path)); - } catch (IOException e) { - LOG.warn("Get HDFS file remote iterator failed. {}" + e.getMessage()); - throw new DdlException("Get HDFS file remote iterator failed. Error: " + e.getMessage()); - } - return iterators; - } - public static List<String> getPartitionNames(HiveTable hiveTable) throws DdlException { HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS)); List<String> partitionNames = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 6702f689cb..1ae05ff630 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -272,8 +272,8 @@ public class ExternalFileScanNode extends ExternalScanNode { try { buildScanRange(); } catch (IOException e) { - LOG.error("Finalize failed.", e); - throw new UserException("Finalize failed.", e); + LOG.warn("Finalize failed.", e); + throw new UserException("Finalize failed: " + e.getMessage()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java index 39ac68773a..eb39a736a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java @@ -39,6 +39,8 @@ import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.ArrayList; @@ -50,6 +52,8 @@ import java.util.stream.Collectors; * A HiveScanProvider to get information for scan node. */ public class ExternalHiveScanProvider implements ExternalFileScanProvider { + private static final Logger LOG = LogManager.getLogger(ExternalHiveScanProvider.class); + protected HMSExternalTable hmsTable; public ExternalHiveScanProvider(HMSExternalTable hmsTable) { @@ -112,33 +116,40 @@ public class ExternalHiveScanProvider implements ExternalFileScanProvider { InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); List<InputSplit> splits; if (!hivePartitions.isEmpty()) { - splits = hivePartitions.stream() - .flatMap(x -> getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream()) - .collect(Collectors.toList()); + try { + splits = hivePartitions.stream().flatMap(x -> { + try { + return getSplitsByPath(inputFormat, configuration, x.getSd().getLocation()).stream(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } catch (RuntimeException e) { + throw new IOException(e); + } } else { splits = getSplitsByPath(inputFormat, configuration, splitsPath); } - return HiveBucketUtil.getPrunedSplitsByBuckets( - splits, - hmsTable.getName(), - exprs, - getRemoteHiveTable().getSd().getBucketCols(), - getRemoteHiveTable().getSd().getNumBuckets(), + return HiveBucketUtil.getPrunedSplitsByBuckets(splits, hmsTable.getName(), exprs, + getRemoteHiveTable().getSd().getBucketCols(), getRemoteHiveTable().getSd().getNumBuckets(), getRemoteHiveTable().getParameters()); } - private List<InputSplit> getSplitsByPath( - InputFormat<?, ?> inputFormat, - Configuration configuration, - String splitsPath) { + private List<InputSplit> getSplitsByPath(InputFormat<?, ?> inputFormat, Configuration configuration, + String splitsPath) throws IOException { JobConf jobConf = new JobConf(configuration); + // For Tez engine, it may generate subdirectoies for "union" query. + // So there may be files and directories in the table directory at the same time. eg: + // /user/hive/warehouse/region_tmp_union_all2/000000_0 + // /user/hive/warehouse/region_tmp_union_all2/1 + // /user/hive/warehouse/region_tmp_union_all2/2 + // So we need to set this config to support visit dir recursively. + // Otherwise, getSplits() may throw exception: "Not a file xxx" + // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 + jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); FileInputFormat.setInputPaths(jobConf, splitsPath); - try { - InputSplit[] splits = inputFormat.getSplits(jobConf, 0); - return Lists.newArrayList(splits); - } catch (IOException e) { - return new ArrayList<InputSplit>(); - } + InputSplit[] splits = inputFormat.getSplits(jobConf, 0); + return Lists.newArrayList(splits); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org