This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c581855a41 [fix](hive-table) fix bug that hive external table can not 
query table created by Tez (#11345)
c581855a41 is described below

commit c581855a410343230bbfe6b259c6241774dc2f37
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Wed Aug 3 09:07:47 2022 +0800

    [fix](hive-table) fix bug that hive external table can not query table 
created by Tez (#11345)
    
    * [fix](hive-table) fix bug that hive external table can not query table 
created by Tez
    
    If the hive is created by Tez, the location of the table is a second-level 
director, eg:
    
    /user/hive/warehouse/region_tmp_union_all/
    ---/user/hive/warehouse/region_tmp_union_all/1
    ---/user/hive/warehouse/region_tmp_union_all/2
    
    We should recursive traverse the directory to get the real files.
---
 .../doris/catalog/HiveMetaStoreClientHelper.java   | 163 ++++++++++++---------
 .../planner/external/ExternalFileScanNode.java     |   4 +-
 .../planner/external/ExternalHiveScanProvider.java |  49 ++++---
 3 files changed, 125 insertions(+), 91 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index a364568ed7..ac15875f0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -37,10 +37,12 @@ import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -74,6 +76,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -171,6 +174,7 @@ public class HiveMetaStoreClientHelper {
 
     /**
      * Get data files of partitions in hive table, filter by partition 
predicate.
+     *
      * @param hiveTable
      * @param hivePartitionPredicate
      * @param fileStatuses
@@ -179,25 +183,108 @@ public class HiveMetaStoreClientHelper {
      * @throws DdlException
      */
     public static String getHiveDataFiles(HiveTable hiveTable, 
ExprNodeGenericFuncDesc hivePartitionPredicate,
-                                          List<TBrokerFileStatus> fileStatuses,
-                                          Table remoteHiveTbl, 
StorageBackend.StorageType type) throws DdlException {
+            List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl, 
StorageBackend.StorageType type)
+            throws DdlException {
+        boolean onS3 = type.equals(StorageBackend.StorageType.S3);
+        Map<String, String> properties = hiveTable.getHiveProperties();
+        Configuration configuration = getConfiguration(properties, onS3);
+        boolean isSecurityEnabled = isSecurityEnabled(properties);
+
         List<RemoteIterator<LocatedFileStatus>> remoteIterators;
-        Boolean onS3 = type.equals(StorageBackend.StorageType.S3);
         if (remoteHiveTbl.getPartitionKeys().size() > 0) {
             String metaStoreUris = 
hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
             // hive partitioned table, get file iterator from table partition 
sd info
             List<Partition> hivePartitions = getHivePartitions(metaStoreUris, 
remoteHiveTbl, hivePartitionPredicate);
-            remoteIterators = getRemoteIterator(hivePartitions, 
hiveTable.getHiveProperties(), onS3);
+            remoteIterators = getRemoteIterator(hivePartitions, configuration, 
isSecurityEnabled, properties, onS3);
         } else {
             // hive non-partitioned table, get file iterator from table sd info
-            remoteIterators = getRemoteIterator(remoteHiveTbl, 
hiveTable.getHiveProperties(), onS3);
+            remoteIterators = getRemoteIterator(remoteHiveTbl, configuration, 
isSecurityEnabled, properties, onS3);
+        }
+        return getAllFileStatus(fileStatuses, remoteIterators, configuration, 
isSecurityEnabled, properties, onS3);
+    }
+
+    // create Configuration for the given properties
+    private static Configuration getConfiguration(Map<String, String> 
properties, boolean onS3) {
+        Configuration configuration = new Configuration(false);
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
+                configuration.set(entry.getKey(), entry.getValue());
+            }
+        }
+        if (onS3) {
+            setS3Configuration(configuration, properties);
+        }
+        return configuration;
+    }
+
+    // return true if it is kerberos
+    private static boolean isSecurityEnabled(Map<String, String> properties) {
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if 
(entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) && 
entry.getValue()
+                    .equals(AuthType.KERBEROS.getDesc())) {
+                return true;
+            }
         }
+        return false;
+    }
 
+    // Get remote iterators for given partitions
+    private static List<RemoteIterator<LocatedFileStatus>> 
getRemoteIterator(List<Partition> partitions,
+            Configuration configuration, boolean isSecurityEnabled, 
Map<String, String> properties, boolean onS3)
+            throws DdlException {
+        List<RemoteIterator<LocatedFileStatus>> allIterators = new 
ArrayList<>();
+        for (Partition p : partitions) {
+            String location = p.getSd().getLocation();
+            Path path = new Path(location);
+            allIterators.addAll(getRemoteIterator(path, configuration, 
properties, isSecurityEnabled));
+        }
+        return allIterators;
+    }
+
+    // Get remote iterators for given table
+    private static List<RemoteIterator<LocatedFileStatus>> 
getRemoteIterator(Table table, Configuration configuration,
+            boolean isSecurityEnabled, Map<String, String> properties, boolean 
onS3) throws DdlException {
+        String location = table.getSd().getLocation();
+        Path path = new Path(location);
+        return getRemoteIterator(path, configuration, properties, 
isSecurityEnabled);
+    }
+
+    // Get remote iterators for given Path
+    private static List<RemoteIterator<LocatedFileStatus>> 
getRemoteIterator(org.apache.hadoop.fs.Path path,
+            Configuration conf, Map<String, String> properties, boolean 
isSecurityEnabled) throws DdlException {
+        List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
+        try {
+            if (isSecurityEnabled) {
+                UserGroupInformation.setConfiguration(conf);
+                // login user from keytab
+                
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+                        properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+            }
+            FileSystem fileSystem = path.getFileSystem(conf);
+            iterators.add(fileSystem.listLocatedStatus(path));
+        } catch (IOException e) {
+            LOG.warn("Get HDFS file remote iterator failed. {}" + 
e.getMessage());
+            throw new DdlException("Get HDFS file remote iterator failed. 
Error: " + e.getMessage());
+        }
+        return iterators;
+    }
+
+    private static String getAllFileStatus(List<TBrokerFileStatus> 
fileStatuses,
+            List<RemoteIterator<LocatedFileStatus>> remoteIterators, 
Configuration configuration,
+            boolean isSecurityEnabled, Map<String, String> properties, boolean 
onS3) throws DdlException {
         String hdfsUrl = "";
-        for (RemoteIterator<LocatedFileStatus> iterator : remoteIterators) {
+        Queue<RemoteIterator<LocatedFileStatus>> queue = 
Queues.newArrayDeque(remoteIterators);
+        while (queue.peek() != null) {
+            RemoteIterator<LocatedFileStatus> iterator = queue.poll();
             try {
                 while (iterator.hasNext()) {
                     LocatedFileStatus fileStatus = iterator.next();
+                    if (fileStatus.isDirectory()) {
+                        // recursive visit the directory to get the file path.
+                        queue.addAll(
+                                getRemoteIterator(fileStatus.getPath(), 
configuration, properties, isSecurityEnabled));
+                        continue;
+                    }
                     TBrokerFileStatus brokerFileStatus = new 
TBrokerFileStatus();
                     brokerFileStatus.setIsDir(fileStatus.isDirectory());
                     brokerFileStatus.setIsSplitable(true);
@@ -226,7 +313,6 @@ public class HiveMetaStoreClientHelper {
                 throw new DdlException("List HDFS file failed. Error: " + 
e.getMessage());
             }
         }
-
         return hdfsUrl;
     }
 
@@ -271,69 +357,6 @@ public class HiveMetaStoreClientHelper {
         configuration.set("fs.s3a.attempts.maximum", "2");
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
-            List<Partition> partitions, Map<String, String> properties, 
boolean onS3)
-            throws DdlException {
-        List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
-        Configuration configuration = new Configuration(false);
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
-                configuration.set(entry.getKey(), entry.getValue());
-            }
-        }
-        if (onS3) {
-            setS3Configuration(configuration, properties);
-        }
-        for (Partition p : partitions) {
-            String location = p.getSd().getLocation();
-            org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(location);
-            try {
-                FileSystem fileSystem = path.getFileSystem(configuration);
-                iterators.add(fileSystem.listLocatedStatus(path));
-            } catch (IOException e) {
-                LOG.warn("Get HDFS file remote iterator failed. {}", 
e.getMessage());
-                throw new DdlException("Get HDFS file remote iterator failed. 
Error: " + e.getMessage());
-            }
-        }
-        return iterators;
-    }
-
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
-            Table table, Map<String, String> properties, boolean onS3)
-            throws DdlException {
-        List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
-        Configuration configuration = new Configuration(false);
-        boolean isSecurityEnabled = false;
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
-                configuration.set(entry.getKey(), entry.getValue());
-            }
-            if 
(entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
-                    && entry.getValue().equals(AuthType.KERBEROS.getDesc())) {
-                isSecurityEnabled = true;
-            }
-        }
-        if (onS3) {
-            setS3Configuration(configuration, properties);
-        }
-        String location = table.getSd().getLocation();
-        org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(location);
-        try {
-            if (isSecurityEnabled) {
-                UserGroupInformation.setConfiguration(configuration);
-                // login user from keytab
-                
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
-                        properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
-            }
-            FileSystem fileSystem = path.getFileSystem(configuration);
-            iterators.add(fileSystem.listLocatedStatus(path));
-        } catch (IOException e) {
-            LOG.warn("Get HDFS file remote iterator failed. {}" + 
e.getMessage());
-            throw new DdlException("Get HDFS file remote iterator failed. 
Error: " + e.getMessage());
-        }
-        return iterators;
-    }
-
     public static List<String> getPartitionNames(HiveTable hiveTable) throws 
DdlException {
         HiveMetaStoreClient client = 
getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
         List<String> partitionNames = new ArrayList<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 6702f689cb..1ae05ff630 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -272,8 +272,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
         try {
             buildScanRange();
         } catch (IOException e) {
-            LOG.error("Finalize failed.", e);
-            throw new UserException("Finalize failed.", e);
+            LOG.warn("Finalize failed.", e);
+            throw new UserException("Finalize failed: " + e.getMessage());
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
index 39ac68773a..eb39a736a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalHiveScanProvider.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -50,6 +52,8 @@ import java.util.stream.Collectors;
  * A HiveScanProvider to get information for scan node.
  */
 public class ExternalHiveScanProvider implements ExternalFileScanProvider {
+    private static final Logger LOG = 
LogManager.getLogger(ExternalHiveScanProvider.class);
+
     protected HMSExternalTable hmsTable;
 
     public ExternalHiveScanProvider(HMSExternalTable hmsTable) {
@@ -112,33 +116,40 @@ public class ExternalHiveScanProvider implements 
ExternalFileScanProvider {
         InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, 
inputFormatName, false);
         List<InputSplit> splits;
         if (!hivePartitions.isEmpty()) {
-            splits = hivePartitions.stream()
-                    .flatMap(x -> getSplitsByPath(inputFormat, configuration, 
x.getSd().getLocation()).stream())
-                    .collect(Collectors.toList());
+            try {
+                splits = hivePartitions.stream().flatMap(x -> {
+                    try {
+                        return getSplitsByPath(inputFormat, configuration, 
x.getSd().getLocation()).stream();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }).collect(Collectors.toList());
+            } catch (RuntimeException e) {
+                throw new IOException(e);
+            }
         } else {
             splits = getSplitsByPath(inputFormat, configuration, splitsPath);
         }
-        return HiveBucketUtil.getPrunedSplitsByBuckets(
-                splits,
-                hmsTable.getName(),
-                exprs,
-                getRemoteHiveTable().getSd().getBucketCols(),
-                getRemoteHiveTable().getSd().getNumBuckets(),
+        return HiveBucketUtil.getPrunedSplitsByBuckets(splits, 
hmsTable.getName(), exprs,
+                getRemoteHiveTable().getSd().getBucketCols(), 
getRemoteHiveTable().getSd().getNumBuckets(),
                 getRemoteHiveTable().getParameters());
     }
 
-    private List<InputSplit> getSplitsByPath(
-            InputFormat<?, ?> inputFormat,
-            Configuration configuration,
-            String splitsPath) {
+    private List<InputSplit> getSplitsByPath(InputFormat<?, ?> inputFormat, 
Configuration configuration,
+            String splitsPath) throws IOException {
         JobConf jobConf = new JobConf(configuration);
+        // For Tez engine, it may generate subdirectoies for "union" query.
+        // So there may be files and directories in the table directory at the 
same time. eg:
+        //      /user/hive/warehouse/region_tmp_union_all2/000000_0
+        //      /user/hive/warehouse/region_tmp_union_all2/1
+        //      /user/hive/warehouse/region_tmp_union_all2/2
+        // So we need to set this config to support visit dir recursively.
+        // Otherwise, getSplits() may throw exception: "Not a file xxx"
+        // 
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
+        jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", 
"true");
         FileInputFormat.setInputPaths(jobConf, splitsPath);
-        try {
-            InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
-            return Lists.newArrayList(splits);
-        } catch (IOException e) {
-            return new ArrayList<InputSplit>();
-        }
+        InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
+        return Lists.newArrayList(splits);
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to