This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 13030f840a23e67c5e9923e8b1abab3b717c106a
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Feb 8 12:14:50 2024 -0800

    IMPALA-12796: Add is_footer_only in TFileSplitGeneratorSpec
    
    Several tests in test_scanners.py failed by wrong row counts with S3
    target filesystem after IMPALA-12631. S3 filesystem does not have block.
    Planner will produce TFileSplitGeneratorSpec instead of
    TScanRangeLocationList, and IMPALA-12631 miss to address necessary
    changes in TFileSplitGeneratorSpec. Meanwhile, it already changed the
    behavior of hdfs-parquet-scanner.cc. For each scan range, the new code
    will loop file_metadata_.row_groups, while the old code just take one
    entry of file_metadata_.row_groups after calling NextRowGroup().
    
    This patch address the issue by adding is_footer_only field in
    TFileSplitGeneratorSpec schedule accordingly in schedule.cc. This also
    add field 'is_footer_scanner_' in hdfs-columnar-scanner.h to check that
    optimized count star only applied with footer range.
    
    Testing:
    - Pass core tests with S3 target filesystem.
    
    Change-Id: Iaa6e3c14debe68cf601131c6594774c8c695923e
    Reviewed-on: http://gerrit.cloudera.org:8080/21021
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hdfs-columnar-scanner.cc               |  5 ++++
 be/src/exec/hdfs-columnar-scanner.h                |  4 +++
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  1 +
 be/src/scheduling/scheduler-test-util.cc           |  1 +
 be/src/scheduling/scheduler-test-util.h            | 10 +++++--
 be/src/scheduling/scheduler-test.cc                | 21 +++++++++++++++
 be/src/scheduling/scheduler.cc                     | 13 +++++++--
 common/thrift/PlanNodes.thrift                     |  4 +++
 .../org/apache/impala/planner/HdfsScanNode.java    | 31 +++++++++++-----------
 9 files changed, 71 insertions(+), 19 deletions(-)

diff --git a/be/src/exec/hdfs-columnar-scanner.cc 
b/be/src/exec/hdfs-columnar-scanner.cc
index 518091612..7a1ad7744 100644
--- a/be/src/exec/hdfs-columnar-scanner.cc
+++ b/be/src/exec/hdfs-columnar-scanner.cc
@@ -81,6 +81,11 @@ HdfsColumnarScanner::~HdfsColumnarScanner() {}
 
 Status HdfsColumnarScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(HdfsScanner::Open(context));
+  // Memorize 'is_footer_scanner_' here since 'stream_' can be released early.
+  const io::ScanRange* range = stream_->scan_range();
+  is_footer_scanner_ =
+      range->offset() + range->bytes_to_read() >= 
stream_->file_desc()->file_length;
+
   RuntimeProfile* profile = scan_node_->runtime_profile();
   num_cols_counter_ = PROFILE_NumColumns.Instantiate(profile);
   num_scanners_with_no_reads_counter_ =
diff --git a/be/src/exec/hdfs-columnar-scanner.h 
b/be/src/exec/hdfs-columnar-scanner.h
index a1d0b2903..00276d5be 100644
--- a/be/src/exec/hdfs-columnar-scanner.h
+++ b/be/src/exec/hdfs-columnar-scanner.h
@@ -60,6 +60,10 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// top-level tuples. See AssembleRows() in the derived classes.
   boost::scoped_ptr<ScratchTupleBatch> scratch_batch_;
 
+  /// Indicate whether this is a footer scanner or not.
+  /// Assigned in HdfsColumnarScanner::Open().
+  bool is_footer_scanner_ = false;
+
   /// Scan range for the metadata.
   const io::ScanRange* metadata_range_ = nullptr;
 
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index d69e985d4..6d41dc147 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -435,6 +435,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* 
row_batch) {
   DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
   if (scan_node_->optimize_parquet_count_star()) {
     // Populate the single slot with the Parquet num rows statistic.
+    DCHECK(is_footer_scanner_);
     int64_t tuple_buf_size;
     uint8_t* tuple_buf;
     int capacity = 1;
diff --git a/be/src/scheduling/scheduler-test-util.cc 
b/be/src/scheduling/scheduler-test-util.cc
index 14e7eb620..97c32e724 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -411,6 +411,7 @@ void Plan::BuildScanRangeSpec(const TableName& table_name,
   thrift_spec->__set_file_desc(thrift_file);
   thrift_spec->__set_max_block_size(spec.block_size);
   thrift_spec->__set_is_splittable(spec.is_splittable);
+  thrift_spec->__set_is_footer_only(spec.is_footer_only);
   int32_t partition_path_hash = 
static_cast<int32_t>(HashUtil::Hash(partition_path.data(),
       partition_path.length(), 0));
   thrift_spec->__set_partition_path_hash(partition_path_hash);
diff --git a/be/src/scheduling/scheduler-test-util.h 
b/be/src/scheduling/scheduler-test-util.h
index 2b6cab078..7696e2c2b 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -202,8 +202,12 @@ struct Block {
 
 struct FileSplitGeneratorSpec {
   FileSplitGeneratorSpec() {}
-  FileSplitGeneratorSpec(int64_t length, int64_t block, bool splittable)
-    : length(length), block_size(block), is_splittable(splittable) {}
+  FileSplitGeneratorSpec(
+      int64_t length, int64_t block, bool splittable, bool is_footer_only = 
false)
+    : length(length),
+      block_size(block),
+      is_splittable(splittable),
+      is_footer_only(is_footer_only) {}
 
   /// Length of file for which to generate file splits.
   int64_t length = DEFAULT_FILE_SIZE;
@@ -213,6 +217,8 @@ struct FileSplitGeneratorSpec {
 
   bool is_splittable = true;
 
+  bool is_footer_only = false;
+
   static const int64_t DEFAULT_FILE_SIZE;
   static const int64_t DEFAULT_BLOCK_SIZE;
 };
diff --git a/be/src/scheduling/scheduler-test.cc 
b/be/src/scheduling/scheduler-test.cc
index 3a132a784..b1ee871df 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -642,6 +642,27 @@ TEST_F(SchedulerTest, TestGeneratedVariableSizeSplit) {
   EXPECT_EQ(300, result.NumTotalAssignedBytes());
 }
 
+TEST_F(SchedulerTest, TestGeneratedVariableSizeSplitFooterOnly) {
+  Cluster cluster;
+
+  cluster.AddHosts(3, true, true);
+
+  Schema schema(cluster);
+  schema.AddFileSplitGeneratorSpecs(
+      "T", {{100, 100, false, false}, {100, 1, true, true}, {100, 10, true, 
true}});
+
+  Plan plan(schema);
+  plan.AddTableScan("T");
+  plan.SetRandomReplica(true);
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  ASSERT_OK(scheduler.Compute(&result));
+
+  EXPECT_EQ(3, result.NumTotalAssignments());
+  EXPECT_EQ(111, result.NumTotalAssignedBytes());
+}
+
 TEST_F(SchedulerTest, TestBlockAndGenerateSplit) {
   Cluster cluster;
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 7f16a4058..287407389 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -121,8 +121,17 @@ Status Scheduler::GenerateScanRanges(const 
vector<TFileSplitGeneratorSpec>& spec
 
     long scan_range_offset = 0;
     long remaining = fb_desc->length();
-    long scan_range_length = std::min(spec.max_block_size, fb_desc->length());
-    if (!spec.is_splittable) scan_range_length = fb_desc->length();
+    long scan_range_length = fb_desc->length();
+
+    if (spec.is_splittable) {
+      scan_range_length = std::min(spec.max_block_size, fb_desc->length());
+      if (spec.is_footer_only) {
+        scan_range_offset = fb_desc->length() - scan_range_length;
+        remaining = scan_range_length;
+      }
+    } else {
+      DCHECK(!spec.is_footer_only);
+    }
 
     while (remaining > 0) {
       THdfsFileSplit hdfs_scan_range;
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c957a5af7..cfee32a5a 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -271,6 +271,10 @@ struct TFileSplitGeneratorSpec {
 
   // Hash of the partition path
   5: required i32 partition_path_hash
+
+  // True if only footer range (the last block in file) is needed.
+  // If True, is_splittable must also be True as well.
+  6: required bool is_footer_only
 }
 
 // Specification of an individual data range which is held in its entirety
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index ed926adef..60e4abc04 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1253,23 +1253,24 @@ public class HdfsScanNode extends ScanNode {
           totalBytesPerFsEC_.merge(fsType, fileDesc.getFileLength(), 
Long::sum);
         }
 
+        // If parquet count star optimization is enabled, we only need the
+        // 'RowGroup.num_rows' in file metadata, thus only the scan range that 
contains
+        // a file footer is required.
+        // IMPALA-8834 introduced the optimization for partition key scan by 
generating
+        // one scan range for each HDFS file. With Parquet and ORC, we only 
need to get
+        // the scan range that contains a file footer for short-circuiting.
+        boolean isFooterOnly = countStarSlot_ != null
+            || (isPartitionKeyScan_
+                && (partition.getFileFormat().isParquetBased()
+                    || partition.getFileFormat() == HdfsFileFormat.ORC));
+
         if (!fsHasBlocks) {
           Preconditions.checkState(fileDesc.getNumFileBlocks() == 0);
           generateScanRangeSpecs(
-              partition, partitionLocation, fileDesc, scanRangeBytesLimit);
+              partition, partitionLocation, fileDesc, scanRangeBytesLimit, 
isFooterOnly);
         } else {
           // Skips files that have no associated blocks.
           if (fileDesc.getNumFileBlocks() == 0) continue;
-          // If parquet count star optimization is enabled, we only need the
-          // 'RowGroup.num_rows' in file metadata, thus only the scan range 
that contains
-          // a file footer is required.
-          // IMPALA-8834 introduced the optimization for partition key scan by 
generating
-          // one scan range for each HDFS file. With Parquet and ORC, we only 
need to get
-          // the scan range that contains a file footer for short-circuiting.
-          boolean isFooterOnly = countStarSlot_ != null
-              || (isPartitionKeyScan_
-                  && (partition.getFileFormat().isParquetBased()
-                      || partition.getFileFormat() == HdfsFileFormat.ORC));
           Pair<Boolean, Long> result =
               transformBlocksToScanRanges(partition, partitionLocation, 
fsType, fileDesc,
                   fsHasBlocks, scanRangeBytesLimit, analyzer, isFooterOnly);
@@ -1371,17 +1372,17 @@ public class HdfsScanNode extends ScanNode {
    * FeFsPartition can be expensive.
    */
   private void generateScanRangeSpecs(FeFsPartition partition, String 
partitionLocation,
-      FileDescriptor fileDesc, long maxBlockSize) {
+      FileDescriptor fileDesc, long maxBlockSize, boolean isFooterOnly) {
     Preconditions.checkArgument(fileDesc.getNumFileBlocks() == 0);
     Preconditions.checkArgument(maxBlockSize > 0);
     if (fileDesc.getFileLength() <= 0) return;
     boolean splittable = partition.getFileFormat().isSplittable(
         HdfsCompression.fromFileName(fileDesc.getPath()));
+    isFooterOnly &= splittable;
     // Hashing must use String.hashCode() for consistency.
     int partitionHash = partitionLocation.hashCode();
-    TFileSplitGeneratorSpec splitSpec = new TFileSplitGeneratorSpec(
-        fileDesc.toThrift(), maxBlockSize, splittable, partition.getId(),
-        partitionHash);
+    TFileSplitGeneratorSpec splitSpec = new 
TFileSplitGeneratorSpec(fileDesc.toThrift(),
+        maxBlockSize, splittable, partition.getId(), partitionHash, 
isFooterOnly);
     scanRangeSpecs_.addToSplit_specs(splitSpec);
     long scanRangeBytes = Math.min(maxBlockSize, fileDesc.getFileLength());
     if (splittable && !isPartitionKeyScan_) {

Reply via email to