This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 15e471563d22ca82fea3c435a98f859bfaf99d6a
Author: Riza Suminto <[email protected]>
AuthorDate: Sun May 7 14:27:04 2023 -0700

    IMPALA-11123: Reimplement ORC optimized count star
    
    Commit 7ca20b3c94b1c9c1ddd4ed1e89f0969a0df55330 revert the original
    optimized count(star) for ORC scan from commit
    f932d78ad0a30e322d59fc39072f710f889d2135 (gerrit review
    http://gerrit.cloudera.org:8080/18327). The revert is necessary since
    the unification of count star and zero slot functions into
    HdfsColumnarScanner and causing significant regression for non-optimized
    counts star query in parquet format (over 15% slower
    MaterializeTupleTime).
    
    This patch reimplements optimized count(star) for ORC scan code path
    while minimizing the code changes needed for parquet scan code path.
    After this patch, ORC and parquet code path will have only the following
    new things in common:
    - THdfsScanNode.count_star_slot_offset renamed to
      THdfsScanNode.star_slot_offset
    - HdfsScanner::IssueFooterRanges will only issue footer ranges if
      IsZeroSlotTableScan() or optimize_count_star() is true (made possible
      for parquet by IMPALA-12631).
    
    The structure of HdfsParquetScanner::GetNextInternal() remains
    unchanged. Its zero scan slot code path is still served through num_rows
    metadata from the parquet footer, while the optimized count star code
    path still loops over row groups metadata (also from parquet footer).
    
    The following table shows single-node benchmark result of 3 count query
    variant on TPC-DS scale 10, both in ORC and parquet format, looped 9
    times.
    
    
+-----------+---------------------------+---------+--------+-------------+------------+
    | Workload  | Query                     | Format  | Avg(s) | Base Avg(s) | 
Delta(Avg) |
    
+-----------+---------------------------+---------+--------+-------------+------------+
    | TPCDS(10) | TPCDS-Q_COUNT_UNOPTIMIZED | orc     | 0.30   | 0.28        |  
 +6.50%   |
    | TPCDS(10) | TPCDS-Q_COUNT_OPTIMIZED   | parquet | 0.14   | 0.14        |  
 +1.56%   |
    | TPCDS(10) | TPCDS-Q_COUNT_ZERO_SLOT   | parquet | 0.27   | 0.27        |  
 +1.42%   |
    | TPCDS(10) | TPCDS-Q_COUNT_ZERO_SLOT   | orc     | 0.28   | 0.29        |  
 -3.03%   |
    | TPCDS(10) | TPCDS-Q_COUNT_UNOPTIMIZED | parquet | 0.21   | 0.22        |  
 -4.45%   |
    | TPCDS(10) | TPCDS-Q_COUNT_OPTIMIZED   | orc     | 0.14   | 0.21        | 
I -35.92%  |
    
+-----------+---------------------------+---------+--------+-------------+------------+
    
    Testing:
    - Restore PlannerTest.testOrcStatsAgg
    - Restore TestAggregationQueriesRunOnce and
      TestAggregationQueriesRunOnce::test_orc_count_star_optimization
    - Exercise count(star) in TestOrc::test_misaligned_orc_stripes
    - Pass core tests
    
    Change-Id: I5971c8f278e1dee44e2a8dd4d2f043d22ebf5d17
    Reviewed-on: http://gerrit.cloudera.org:8080/19927
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hdfs-scan-node-base.cc                 |   5 +-
 be/src/exec/hdfs-scan-node-base.h                  |  12 +-
 be/src/exec/hdfs-scanner.cc                        |   4 +-
 be/src/exec/orc/hdfs-orc-scanner.cc                |  45 ++-
 be/src/exec/orc/hdfs-orc-scanner.h                 |   1 -
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  11 +-
 be/src/exec/parquet/hdfs-parquet-scanner.h         |   2 -
 common/thrift/PlanNodes.thrift                     |   5 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  13 +-
 .../org/apache/impala/planner/PlannerTest.java     |   5 +
 .../queries/PlannerTest/orc-stats-agg.test         | 439 +++++++++++++++++++++
 .../queries/QueryTest/orc-stats-agg.test           | 164 ++++++++
 .../queries/QueryTest/partition-key-scans.test     |  21 +
 .../queries/QueryTest/scanners.test                |   3 +
 tests/query_test/test_aggregation.py               | 102 ++---
 tests/query_test/test_scanners.py                  |  12 +-
 tests/util/test_file_parser.py                     |  63 ++-
 17 files changed, 809 insertions(+), 98 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index c1ceb5c66..4ccd656a6 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -451,9 +451,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const 
HdfsScanPlanNode& pno
             hdfs_scan_node.skip_header_line_count :
             0),
     tuple_id_(pnode.tuple_id_),
-    parquet_count_star_slot_offset_(
-        hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
-            hdfs_scan_node.parquet_count_star_slot_offset :
+    count_star_slot_offset_(hdfs_scan_node.__isset.count_star_slot_offset ?
+            hdfs_scan_node.count_star_slot_offset :
             -1),
     is_partition_key_scan_(hdfs_scan_node.is_partition_key_scan),
     tuple_desc_(pnode.tuple_desc_),
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index 160d4d782..0517b1f3c 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -463,10 +463,8 @@ class HdfsScanNodeBase : public ScanNode {
   const AvroSchemaElement& avro_schema() const { return avro_schema_; }
   int skip_header_line_count() const { return skip_header_line_count_; }
   io::RequestContext* reader_context() const { return reader_context_.get(); }
-  bool optimize_parquet_count_star() const {
-    return parquet_count_star_slot_offset_ != -1;
-  }
-  int parquet_count_star_slot_offset() const { return 
parquet_count_star_slot_offset_; }
+  bool optimize_count_star() const { return count_star_slot_offset_ != -1; }
+  int count_star_slot_offset() const { return count_star_slot_offset_; }
   bool is_partition_key_scan() const { return is_partition_key_scan_; }
 
   typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>>
@@ -686,11 +684,11 @@ class HdfsScanNodeBase : public ScanNode {
   /// Tuple id of the tuple descriptor to be used.
   const int tuple_id_;
 
-  /// The byte offset of the slot for Parquet metadata if Parquet count star 
optimization
+  /// The byte offset of the slot for Parquet/ORC metadata if count star 
optimization
   /// is enabled. When set, this scan node can optimize a count(*) query by 
populating
   /// the tuple with data from the Parquet num rows statistic. See
-  /// applyParquetCountStartOptimization() in HdfsScanNode.java.
-  const int parquet_count_star_slot_offset_;
+  /// applyCountStarOptimization() in ScanNode.java.
+  const int count_star_slot_offset_;
 
   // True if this is a partition key scan that needs only to return at least 
one row from
   // each scan range. If true, the scan node and scanner implementations 
should attempt
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index e17b8d183..ff83ab616 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -1013,6 +1013,8 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* 
scan_node,
 
     // Try to find the split with the footer.
     ScanRange* footer_split = FindFooterSplit(files[i]);
+    bool footer_scanner =
+        scan_node->IsZeroSlotTableScan() || scan_node->optimize_count_star();
 
     for (int j = 0; j < files[i]->splits.size(); ++j) {
       ScanRange* split = files[i]->splits[j];
@@ -1023,7 +1025,7 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* 
scan_node,
       // groups. We only want a single node to process the file footer in this 
case,
       // which is the node with the footer split.  If it's not a count(*), we 
create a
       // footer range for the split always.
-      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
+      if (!footer_scanner || footer_split == split) {
         ScanRangeMetadata* split_metadata =
             static_cast<ScanRangeMetadata*>(split->meta_data());
         // Each split is processed by first issuing a scan range for the file 
footer, which
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc 
b/be/src/exec/orc/hdfs-orc-scanner.cc
index 6c8cf7822..64369ae97 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -406,9 +406,10 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
     row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME;
   }
 
-  if (UNLIKELY(scan_node_->optimize_parquet_count_star())) {
-    DCHECK(false);
-    return Status("Internal ERROR: ORC scanner cannot optimize count star 
slot.");
+  if (scan_node_->optimize_count_star()) {
+    DCHECK(!row_batches_need_validation_);
+    template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
+    return Status::OK();
   }
 
   // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib 
can skip
@@ -779,10 +780,39 @@ Status HdfsOrcScanner::ProcessSplit() {
 }
 
 Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
-  // In case 'row_batches_need_validation_' is true, we need to look at the row
-  // batches and check their validity. In that case 'currentTransaction' is 
the only
-  // selected field from the file (in case of zero slot scans).
-  if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) {
+  if (scan_node_->optimize_count_star()) {
+    // There are no materialized slots, e.g. count(*) over the table.  We can 
serve
+    // this query from just the file metadata.  We don't need to read the 
column data.
+    // Only scanner of the footer split will run in this case. See the logic in
+    // HdfsScanner::IssueFooterRanges() and 
HdfsScanNodeBase::ReadsFileMetadataOnly().
+    DCHECK(!row_batches_need_validation_);
+    DCHECK(is_footer_scanner_);
+    uint64_t file_rows = reader_->getNumberOfRows();
+    DCHECK_LT(stripe_rows_read_, file_rows);
+    COUNTER_ADD(num_file_metadata_read_, 1);
+    int64_t tuple_buffer_size;
+    uint8_t* tuple_buffer;
+    int capacity = 1;
+    RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state_,
+        row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), 
&capacity,
+        &tuple_buffer_size, &tuple_buffer));
+    Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buffer);
+    InitTuple(template_tuple_, dst_tuple);
+    int64_t* dst_slot = 
dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset());
+    *dst_slot = file_rows;
+    TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
+    dst_row->SetTuple(0, dst_tuple);
+    row_batch->CommitLastRow();
+    stripe_rows_read_ += file_rows;
+    COUNTER_ADD(scan_node_->rows_read_counter(), file_rows);
+    eos_ = true;
+    return Status::OK();
+  } else if (scan_node_->IsZeroSlotTableScan() && 
!row_batches_need_validation_) {
+    // In case 'row_batches_need_validation_' is true, we need to look at the 
row
+    // batches and check their validity. In that case 'currentTransaction' is 
the only
+    // selected field from the file (in case of zero slot scans).
+    // This block only handle case when 'row_batches_need_validation_' is 
false.
+    DCHECK(is_footer_scanner_);
     uint64_t file_rows = reader_->getNumberOfRows();
     // There are no materialized slots, e.g. count(*) over the table.  We can 
serve
     // this query from just the file metadata.  We don't need to read the 
column data.
@@ -790,6 +820,7 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) 
{
       eos_ = true;
       return Status::OK();
     }
+    DCHECK_LT(stripe_rows_read_, file_rows);
     COUNTER_ADD(num_file_metadata_read_, 1);
     assemble_rows_timer_.Start();
     DCHECK_LT(stripe_rows_read_, file_rows);
diff --git a/be/src/exec/orc/hdfs-orc-scanner.h 
b/be/src/exec/orc/hdfs-orc-scanner.h
index bf52787bc..a066105de 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.h
+++ b/be/src/exec/orc/hdfs-orc-scanner.h
@@ -195,7 +195,6 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   friend class OrcStructReader;
   friend class OrcListReader;
   friend class OrcMapReader;
-  friend class HdfsOrcScannerTest;
 
   /// Memory guard of the tuple_mem_
   uint8_t* tuple_mem_end_ = nullptr;
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 6d41dc147..29df05da0 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -383,7 +383,7 @@ static bool CheckRowGroupOverlapsSplit(const 
parquet::RowGroup& row_group,
 
 int HdfsParquetScanner::CountScalarColumns(
     const vector<ParquetColumnReader*>& column_readers) {
-  DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star());
+  DCHECK(!column_readers.empty() || scan_node_->optimize_count_star());
   int num_columns = 0;
   stack<ParquetColumnReader*> readers;
   for (ParquetColumnReader* r: column_readers_) readers.push(r);
@@ -433,7 +433,8 @@ Status HdfsParquetScanner::ProcessSplit() {
 
 Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
   DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
-  if (scan_node_->optimize_parquet_count_star()) {
+  if (scan_node_->optimize_count_star()) {
+    // This is an optimized count(*) case.
     // Populate the single slot with the Parquet num rows statistic.
     DCHECK(is_footer_scanner_);
     int64_t tuple_buf_size;
@@ -447,8 +448,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* 
row_batch) {
       Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
       TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
       InitTuple(template_tuple_, dst_tuple);
-      int64_t* dst_slot =
-          
dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
+      int64_t* dst_slot = 
dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset());
       *dst_slot = 0;
       for (const auto &row_group : file_metadata_.row_groups) {
         *dst_slot += row_group.num_rows;
@@ -460,6 +460,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* 
row_batch) {
     eos_ = true;
     return Status::OK();
   } else if (scan_node_->IsZeroSlotTableScan()) {
+    DCHECK(is_footer_scanner_);
     // There are no materialized slots and we are not optimizing count(*), e.g.
     // "select 1 from alltypes". We can serve this query from just the file 
metadata.
     // We don't need to read the column data.
@@ -2836,7 +2837,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const 
TupleDescriptor& tuple_desc
   DCHECK(column_readers != nullptr);
   DCHECK(column_readers->empty());
 
-  if (scan_node_->optimize_parquet_count_star()) {
+  if (scan_node_->optimize_count_star()) {
     // Column readers are not needed because we are not reading from any 
columns if this
     // optimization is enabled.
     return Status::OK();
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h 
b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 5f5c038da..c09aa6a48 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -50,7 +50,6 @@ class ParquetColumnReader;
 class ParquetPageReader;
 template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
 class ScalarColumnReader;
-class BoolColumnReader;
 
 /// This scanner parses Parquet files located in HDFS, and writes the content 
as tuples in
 /// the Impala in-memory representation of data, e.g.  (tuples, rows, row 
batches).
@@ -400,7 +399,6 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   friend class BaseScalarColumnReader;
   template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
   friend class ScalarColumnReader;
-  friend class BoolColumnReader;
   friend class HdfsParquetScannerTest;
   friend class ParquetPageIndex;
   friend class ParquetColumnChunkReader;
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index cfee32a5a..1c7333571 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -329,9 +329,8 @@ struct THdfsScanNode {
   // The conjuncts that are eligible for dictionary filtering.
   9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
 
-  // The byte offset of the slot for Parquet metadata if Parquet count star 
optimization
-  // is enabled.
-  10: optional i32 parquet_count_star_slot_offset
+  // The byte offset of the slot for counter if count star optimization is 
enabled.
+  10: optional i32 count_star_slot_offset
 
   // If true, the backend only needs to return one row per partition.
   11: optional bool is_partition_key_scan
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 60e4abc04..6dc5590f9 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -91,6 +91,7 @@ import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.MathUtil;
@@ -339,6 +340,8 @@ public class HdfsScanNode extends ScanNode {
   // Used only to display EXPLAIN information.
   private final List<Expr> partitionConjuncts_;
 
+  private boolean isFullAcidTable_ = false;
+
   /**
    * Construct a node to scan given data files into tuples described by 'desc',
    * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -359,6 +362,8 @@ public class HdfsScanNode extends ScanNode {
     tableNumRowsHint_ = hdfsTblRef.getTableNumRowsHint();
     FeFsTable hdfsTable = (FeFsTable)hdfsTblRef.getTable();
     Preconditions.checkState(tbl_ == hdfsTable);
+    isFullAcidTable_ =
+        
AcidUtils.isFullAcidTable(hdfsTable.getMetaStoreTable().getParameters());
     StringBuilder error = new StringBuilder();
     aggInfo_ = aggInfo;
     skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error);
@@ -403,13 +408,14 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Returns true if the Parquet count(*) optimization can be applied to the 
query block
+   * Returns true if the count(*) optimization can be applied to the query 
block
    * of this scan node.
    */
   protected boolean canApplyCountStarOptimization(Analyzer analyzer,
       Set<HdfsFileFormat> fileFormats) {
     if (fileFormats.size() != 1) return false;
-    if (!hasParquet(fileFormats)) return false;
+    if (isFullAcidTable_) return false;
+    if (!hasParquet(fileFormats) && !hasOrc(fileFormats)) return false;
     return canApplyCountStarOptimization(analyzer);
   }
 
@@ -1896,8 +1902,7 @@ public class HdfsScanNode extends ScanNode {
     msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
     Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == 
null));
     if (countStarSlot_ != null) {
-      msg.hdfs_scan_node.setParquet_count_star_slot_offset(
-          countStarSlot_.getByteOffset());
+      
msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
     }
     if (!statsConjuncts_.isEmpty()) {
       for (Expr e: statsConjuncts_) {
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index b28d489eb..7449a1697 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1390,6 +1390,11 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("tpcds-dist-method", "tpcds");
   }
 
+  @Test
+  public void testOrcStatsAgg() {
+    runPlannerTestFile("orc-stats-agg");
+  }
+
   /**
    * Test new hint of 'TABLE_NUM_ROWS'
    */
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test
new file mode 100644
index 000000000..75265a0b0
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test
@@ -0,0 +1,439 @@
+# Verify that that the ORC count(*) optimization is applied in all count(*) or
+# count(<literal>) cases when scanning a ORC table. In the last case, we are 
scanning
+# a text table, so the optimization is not applied. The optimization is 
observed when
+# the cardinality of the ORC scan (24) is the same as # the # of files (24).
+select count(*) from functional_orc_def.uncomp_src_alltypes
+union all
+select count(1) from functional_orc_def.uncomp_src_alltypes
+union all
+select count(123) from functional_orc_def.uncomp_src_alltypes
+union all
+select count(*) from functional.alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=4
+|
+|--08:AGGREGATE [FINALIZE]
+|  |  output: count(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  07:SCAN HDFS [functional.alltypes]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
+|
+|--06:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=4B cardinality=24
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=4B cardinality=24
+|
+02:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  row-size=8B cardinality=1
+|
+01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=4
+|
+|--16:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  15:EXCHANGE [UNPARTITIONED]
+|  |
+|  08:AGGREGATE
+|  |  output: count(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  07:SCAN HDFS [functional.alltypes]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=0B cardinality=7.30K
+|
+|--14:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  13:EXCHANGE [UNPARTITIONED]
+|  |
+|  06:AGGREGATE
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=4B cardinality=24
+|
+|--12:AGGREGATE [FINALIZE]
+|  |  output: count:merge(*)
+|  |  row-size=8B cardinality=1
+|  |
+|  11:EXCHANGE [UNPARTITIONED]
+|  |
+|  04:AGGREGATE
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=4B cardinality=24
+|
+10:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  row-size=8B cardinality=1
+|
+09:EXCHANGE [UNPARTITIONED]
+|
+02:AGGREGATE
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  row-size=8B cardinality=1
+|
+01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# Verify that the ORC count(*) optimization is applied even if there is more 
than
+# one item in the select list.
+select count(*), count(1), count(123) from 
functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# Select count(<partition col>) - the optimization is disabled because it's 
not a
+# count(<literal>) or count(*) aggregate function.
+select count(year) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(`year`)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=13.07K
+====
+# Group by partition columns.
+select month, count(*) from functional_orc_def.uncomp_src_alltypes group by 
month, year
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  group by: `month`, `year`
+|  row-size=16B cardinality=24
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=16B cardinality=24
+====
+# The optimization is disabled because tinyint_col is not a partition col.
+select tinyint_col, count(*) from functional_orc_def.uncomp_src_alltypes group 
by tinyint_col, year
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  group by: tinyint_col, `year`
+|  row-size=13B cardinality=13.07K
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=5B cardinality=13.07K
+====
+# The optimization is disabled because it can not be applied to the 1st 
aggregate
+# function.
+select avg(year), count(*) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: avg(`year`), count(*)
+|  row-size=16B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=13.07K
+====
+# Optimization is not applied because the inner count(*) is not materialized. 
The outer
+# count(*) does not reference a base table.
+select count(*) from (select count(*) from 
functional_orc_def.uncomp_src_alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+01:AGGREGATE [FINALIZE]
+|  row-size=0B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   partition key scan
+   row-size=0B cardinality=24
+====
+# The optimization is applied if count(*) is in the having clause.
+select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  having: count(*) > 1
+|  row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# The count(*) optimization is applied in the inline view.
+select count(*), count(a) from (select count(1) as a from 
functional_orc_def.uncomp_src_alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(*), count(count(*))
+|  row-size=16B cardinality=1
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# The count(*) optimization is applied to the inline view even if there is a 
join.
+select *
+from functional.alltypes x inner join (
+  select count(1) as a from functional_orc_def.uncomp_src_alltypes group by 
year
+) t on x.id = t.a;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: x.id = count(*)
+|  runtime filters: RF000 <- count(*)
+|  row-size=101B cardinality=2
+|
+|--02:AGGREGATE [FINALIZE]
+|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  |  group by: `year`
+|  |  row-size=12B cardinality=2
+|  |
+|  01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=12B cardinality=24
+|
+00:SCAN HDFS [functional.alltypes x]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> x.id
+   row-size=89B cardinality=7.30K
+====
+# The count(*) optimization is not applied if there is more than 1 table ref.
+select count(*) from functional_orc_def.uncomp_src_alltypes a, 
functional_orc_def.uncomp_src_alltypes b
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|  row-size=0B cardinality=170.85M
+|
+|--01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes b]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=0B cardinality=13.07K
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes a]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=0B cardinality=13.07K
+====
+# The count(*) optimization is applied if all predicates are on partition 
columns only.
+select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 
and month > 8;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   partition predicates: `year` < 2010, `month` > 8
+   HDFS partitions=4/24 files=4 size=33.53KB
+   row-size=8B cardinality=4
+====
+# tinyint_col is not a partition column so the optimization is disabled.
+select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 
and tinyint_col > 8;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   partition predicates: `year` < 2010
+   HDFS partitions=12/24 files=12 size=102.74KB
+   predicates: tinyint_col > 8
+   row-size=1B cardinality=654
+====
+# Optimization is applied after constant folding.
+select count(1 + 2 + 3) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=4B cardinality=24
+====
+# Optimization is not applied to count(null).
+select count(1 + null + 3) from functional_orc_def.uncomp_src_alltypes
+union all
+select count(null) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=8B cardinality=2
+|
+|--04:AGGREGATE [FINALIZE]
+|  |  output: count(NULL)
+|  |  row-size=8B cardinality=1
+|  |
+|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+|     HDFS partitions=24/24 files=24 size=205.47KB
+|     row-size=0B cardinality=13.07K
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(NULL + 3)
+|  row-size=8B cardinality=1
+|
+01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=0B cardinality=13.07K
+====
+# Optimization is not applied when selecting from an empty table.
+select count(*) from functional_orc_def.emptytable
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional_orc_def.emptytable]
+   partitions=0/0 files=0 size=0B
+   row-size=0B cardinality=0
+====
+# Optimization is not applied when all partitions are pruned.
+select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   partition predicates: `year` = -1
+   partitions=0/24 files=0 size=0B
+   row-size=0B cardinality=0
+====
+# Optimization is not applied across query blocks, even though it would be 
correct here.
+select count(*) from (select int_col from 
functional_orc_def.uncomp_src_alltypes) t
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=0B cardinality=13.07K
+====
+# In general, optimization is not applied when there is a distinct agg.
+select count(*), count(distinct 1) from functional_orc_def.uncomp_src_alltypes
+---- PLAN
+PLAN-ROOT SINK
+|
+02:AGGREGATE [FINALIZE]
+|  output: count(1), count:merge(*)
+|  row-size=16B cardinality=1
+|
+01:AGGREGATE
+|  output: count(*)
+|  group by: 1
+|  row-size=9B cardinality=1
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   HDFS partitions=24/24 files=24 size=205.47KB
+   row-size=0B cardinality=13.07K
+====
+# The optimization is applied here because only the count(*) and a partition 
column are
+# materialized. Non-materialized agg exprs are ignored.
+select year, cnt from (
+  select year, count(bigint_col), count(*) cnt, avg(int_col)
+  from functional_orc_def.uncomp_src_alltypes
+  where month=1
+  group by year
+) t
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
+|  group by: `year`
+|  row-size=12B cardinality=2
+|
+00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
+   partition predicates: `month` = 1
+   HDFS partitions=2/24 files=2 size=17.07KB
+   row-size=12B cardinality=2
+====
+# Optimization is not applied when selecting from a full acid table.
+select count(*) from functional_orc_def.complextypestbl
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional_orc_def.complextypestbl]
+   HDFS partitions=1/1 files=2 size=4.04KB
+   row-size=0B cardinality=2.57K
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test 
b/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test
new file mode 100644
index 000000000..db7656cde
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test
@@ -0,0 +1,164 @@
+====
+---- QUERY
+# Tests the correctness of the ORC count(*) optimization.
+select count(1)
+from functional_orc_def.uncomp_src_alltypes
+---- RESULTS
+7300
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 7300
+=====
+---- QUERY
+# Tests the correctness of zero slot scan over ORC.
+# Does not verify 'NumFileMetadataRead' here since codegen vs non-codegen yield
+# different number.
+select 1 from functional_orc_def.alltypestiny
+---- RESULTS
+1
+1
+1
+1
+1
+1
+1
+1
+---- TYPES
+tinyint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, RowsRead): 8
+=====
+---- QUERY
+# ORC count(*) optimization with predicates on the partition columns.
+select count(1)
+from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8
+---- RESULTS
+1220
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, RowsRead): 1220
+=====
+---- QUERY
+# ORC count(*) optimization with group by partition columns.
+select year, month, count(1)
+from functional_orc_def.uncomp_src_alltypes group by year, month
+---- RESULTS
+2009,1,310
+2009,2,280
+2009,3,310
+2009,4,300
+2009,5,310
+2009,6,300
+2009,7,310
+2009,8,310
+2009,9,300
+2009,10,310
+2009,11,300
+2009,12,310
+2010,1,310
+2010,2,280
+2010,3,310
+2010,4,300
+2010,5,310
+2010,6,300
+2010,7,310
+2010,8,310
+2010,9,300
+2010,10,310
+2010,11,300
+2010,12,310
+---- TYPES
+int, int, bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 7300
+=====
+---- QUERY
+# ORC count(*) optimization with both group by and predicates on partition 
columns.
+select count(1)
+from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8
+group by month
+---- RESULTS
+310
+300
+310
+300
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, RowsRead): 1220
+=====
+---- QUERY
+# ORC count(*) optimization with the result going into a join.
+select x.bigint_col from functional_orc_def.uncomp_src_alltypes x
+  inner join (
+    select count(1) as a from functional_orc_def.uncomp_src_alltypes group by 
year
+  ) t on x.id = t.a;
+---- RESULTS
+0
+0
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 24
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 14600
+=====
+---- QUERY
+# ORC count(*) optimization with the agg function in the having clause.
+select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1
+---- RESULTS
+1
+---- TYPES
+tinyint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 24
+aggregation(SUM, RowsRead): 7300
+====
+---- QUERY
+# Verify that 0 is returned for count(*) on an empty table.
+select count(1) from functional_orc_def.emptytable
+---- RESULTS
+0
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 0
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# Verify that 0 is returned when all partitions are pruned.
+select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1
+---- RESULTS
+0
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, NumFileMetadataRead): 0
+aggregation(SUM, RowsRead): 0
+=====
+---- QUERY
+# Verify count star over full acid table.
+# NumFileMetadataRead is varied depending on DISABLE_CODEGEN option.
+select count(*) from functional_orc_def.complextypestbl
+---- RESULTS
+8
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumOrcStripes): 0
+aggregation(SUM, RowsRead): 8
+=====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
 
b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
index 0d9e17330..3e5134378 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
@@ -11,6 +11,9 @@ INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 24
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Test with more complex multiple distinct aggregation.
@@ -23,6 +26,9 @@ BIGINT,BIGINT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 24
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Distinct aggregation with multiple columns.
@@ -58,6 +64,9 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 24
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition key scan combined with analytic function.
@@ -71,6 +80,9 @@ INT,BIGINT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 24
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition scan combined with sort.
@@ -107,6 +119,9 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 24
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition key scan combined with predicate on partition columns
@@ -121,6 +136,9 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 2
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumFileMetadataRead): 2
 ====
 ---- QUERY
 # Partition key scan combined with having predicate.
@@ -136,6 +154,9 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 24
+aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Empty table should not return any rows
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/scanners.test 
b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
index eff43d53d..6f58d56de 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
@@ -238,6 +238,9 @@ tinyint
 1
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 100
+---- RUNTIME_PROFILE: table_format=parquet,orc
+aggregation(SUM, RowsRead): 100
+aggregation(SUM, RowsReturned): 200
 ====
 ---- QUERY
 select year, count(*) from alltypes group by year
diff --git a/tests/query_test/test_aggregation.py 
b/tests/query_test/test_aggregation.py
index c7498ec06..aa7cfdb35 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -261,24 +261,6 @@ class TestAggregationQueries(ImpalaTestSuite):
       # Verify codegen was enabled for all four stages of the aggregation.
       assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])
 
-  def test_parquet_count_star_optimization(self, vector, unique_database):
-    if (vector.get_value('table_format').file_format != 'text' or
-        vector.get_value('table_format').compression_codec != 'none'):
-      # No need to run this test on all file formats
-      pytest.skip()
-    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
-    vector.get_value('exec_option')['batch_size'] = 1
-    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
-
-  def test_kudu_count_star_optimization(self, vector, unique_database):
-    if (vector.get_value('table_format').file_format != 'text' or
-       vector.get_value('table_format').compression_codec != 'none'):
-      # No need to run this test on all file formats
-      pytest.skip()
-    self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
-    vector.get_value('exec_option')['batch_size'] = 1
-    self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
-
   def test_ndv(self):
     """Test the version of NDV() that accepts a scale value argument against
     different column data types. The scale argument is an integer in range
@@ -322,17 +304,70 @@ class TestAggregationQueries(ImpalaTestSuite):
       for j in range(0, 11):
         assert(ndv_results[i - 1][j] == int(ndv_vals[j]))
 
-  def test_sampled_ndv(self, vector, unique_database):
+  def test_grouping_sets(self, vector):
+    """Tests for ROLLUP, CUBE and GROUPING SETS."""
+    if vector.get_value('table_format').file_format == 'hbase':
+      pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
+    self.run_test_case('QueryTest/grouping-sets', vector)
+
+  def test_aggregation_limit(self, vector):
+    """Test that limits are honoured when enforced by aggregation node."""
+    # 1-phase
+    result = self.execute_query(
+        "select distinct l_orderkey from tpch.lineitem limit 10",
+        vector.get_value('exec_option'))
+    assert len(result.data) == 10
+
+    # 2-phase with transpose
+    result = self.execute_query(
+        "select count(distinct l_discount), group_concat(distinct 
l_linestatus), "
+        "max(l_quantity) from tpch.lineitem group by l_tax, l_shipmode limit 
10;",
+        vector.get_value('exec_option'))
+    assert len(result.data) == 10
+
+
+class TestAggregationQueriesRunOnce(ImpalaTestSuite):
+  """Run the aggregation test suite similarly as TestAggregationQueries, but 
with stricter
+  constraint. Each test in this class only run once by setting uncompressed 
text dimension
+  for all exploration strategy. However, they may not necessarily target 
uncompressed text
+  table format. This also run with codegen enabled and disabled to exercise our
+  non-codegen code"""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestAggregationQueriesRunOnce, cls).add_test_dimensions()
+
+    cls.ImpalaTestMatrix.add_dimension(
+      create_exec_option_dimension(disable_codegen_options=[False, True]))
+
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  def test_parquet_count_star_optimization(self, vector, unique_database):
+    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
+
+  def test_kudu_count_star_optimization(self, vector):
+    self.run_test_case('QueryTest/kudu-stats-agg', vector)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/kudu-stats-agg', vector)
+
+  def test_orc_count_star_optimization(self, vector):
+    self.run_test_case('QueryTest/orc-stats-agg', vector)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/orc-stats-agg', vector)
+
+  def test_sampled_ndv(self, vector):
     """The SAMPLED_NDV() function is inherently non-deterministic and cannot be
     reasonably made deterministic with existing options so we test it 
separately.
     The goal of this test is to ensure that SAMPLED_NDV() works on all data 
types
     and returns approximately sensible estimates. It is not the goal of this 
test
     to ensure tight error bounds on the NDV estimates. SAMPLED_NDV() is 
expected
     be inaccurate on small data sets like the ones we use in this test."""
-    if (vector.get_value('table_format').file_format != 'text' or
-        vector.get_value('table_format').compression_codec != 'none'):
-      # No need to run this test on all file formats
-      pytest.skip()
 
     # NDV() is used a baseline to compare SAMPLED_NDV(). Both NDV() and 
SAMPLED_NDV()
     # are based on HyperLogLog so NDV() is roughly the best that SAMPLED_NDV() 
can do.
@@ -386,27 +421,6 @@ class TestAggregationQueries(ImpalaTestSuite):
       for i in range(14, 16):
         self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, 
int(ndv_vals[i]), 2.0)
 
-  def test_grouping_sets(self, vector):
-    """Tests for ROLLUP, CUBE and GROUPING SETS."""
-    if vector.get_value('table_format').file_format == 'hbase':
-      pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
-    self.run_test_case('QueryTest/grouping-sets', vector)
-
-  def test_aggregation_limit(self, vector):
-    """Test that limits are honoured when enforced by aggregation node."""
-    # 1-phase
-    result = self.execute_query(
-        "select distinct l_orderkey from tpch.lineitem limit 10",
-        vector.get_value('exec_option'))
-    assert len(result.data) == 10
-
-    # 2-phase with transpose
-    result = self.execute_query(
-        "select count(distinct l_discount), group_concat(distinct 
l_linestatus), "
-        "max(l_quantity) from tpch.lineitem group by l_tax, l_shipmode limit 
10;",
-        vector.get_value('exec_option'))
-    assert len(result.data) == 10
-
 
 class TestDistinctAggregation(ImpalaTestSuite):
   """Run the distinct aggregation test suite, with codegen and 
shuffle_distinct_exprs
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 88e9abf83..daa163504 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1682,7 +1682,8 @@ class TestOrc(ImpalaTestSuite):
   def _misaligned_orc_stripes_helper(
           self, table_name, rows_in_table, num_scanners_with_no_reads=0):
     """Checks if 'num_scanners_with_no_reads' indicates the expected number of 
scanners
-    that don't read anything because the underlying file is poorly formatted
+    that don't read anything because the underlying file is poorly formatted.
+    Additionally, test that select count(star) match with expected number of 
rows.
     """
     query = 'select * from %s' % table_name
     result = self.client.execute(query)
@@ -1703,6 +1704,11 @@ class TestOrc(ImpalaTestSuite):
       total += int(n)
     assert total == num_scanners_with_no_reads
 
+    # Test that select count(star) match with expected number of rows.
+    query = 'select count(*) from %s' % table_name
+    result = self.client.execute(query)
+    assert int(result.data[0]) == rows_in_table
+
   # Skip this test on non-HDFS filesystems, because orc-type-check.test 
contains Hive
   # queries that hang in some cases (IMPALA-9345). It would be possible to 
separate
   # the tests that use Hive and run most tests on S3, but I think that running 
these on
@@ -1836,13 +1842,13 @@ class TestOrc(ImpalaTestSuite):
         "CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC",
         unique_database, test_name, test_files)
     err = self.execute_query_expect_failure(self.client,
-        "select count(*) from {0}.{1}".format(unique_database, test_name))
+        "select count(id) from {0}.{1}".format(unique_database, test_name))
     assert expected_error in str(err)
 
   def test_invalid_schema(self, vector, unique_database):
     """Test scanning of ORC file with malformed schema."""
     self._run_invalid_schema_test(unique_database, "corrupt_schema",
-        "Encountered parse error during schema selection")
+        "Encountered parse error in tail of ORC file")
     self._run_invalid_schema_test(unique_database, "corrupt_root_type",
         "Root of the selected type returned by the ORC lib is not STRUCT: 
boolean.")
 
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index a5ddb9670..8bd2973d4 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -34,6 +34,8 @@ LOG = logging.getLogger('impala_test_suite')
 # constants
 SECTION_DELIMITER = "===="
 SUBSECTION_DELIMITER = "----"
+ALLOWED_TABLE_FORMATS = ['kudu', 'parquet', 'orc']
+
 
 # The QueryTestSectionReader provides utility functions that help to parse 
content
 # from a query test file
@@ -73,16 +75,18 @@ class QueryTestSectionReader(object):
     elif table_format.compression_codec == 'none':
       suffix = '_%s' % (table_format.file_format)
     elif table_format.compression_type == 'record':
-      suffix =  '_%s_record_%s' % (table_format.file_format,
+      suffix = '_%s_record_%s' % (table_format.file_format,
                                    table_format.compression_codec)
     else:
-      suffix =  '_%s_%s' % (table_format.file_format, 
table_format.compression_codec)
+      suffix = '_%s_%s' % (table_format.file_format, 
table_format.compression_codec)
     dataset = table_format.dataset.replace('-', '')
     return dataset + scale_factor + suffix
 
 
 def remove_comments(section_text):
-  return '\n'.join([l for l in section_text.split('\n') if not 
l.strip().startswith('#')])
+  lines = [line for line in section_text.split('\n') if not 
line.strip().startswith('#')]
+  return '\n'.join(lines)
+
 
 def parse_query_test_file(file_name, valid_section_names=None, encoding=None):
   """
@@ -102,6 +106,7 @@ def parse_query_test_file(file_name, 
valid_section_names=None, encoding=None):
   return parse_test_file(file_name, section_names, encoding=encoding,
       skip_unknown_sections=False)
 
+
 def parse_table_constraints(constraints_file):
   """Reads a table constraints file, if one exists"""
   schema_include = defaultdict(list)
@@ -134,11 +139,13 @@ def parse_table_constraints(constraints_file):
           raise ValueError('Unknown constraint type: %s' % constraint_type)
   return schema_include, schema_exclude, schema_only
 
+
 def parse_table_format_constraint(table_format_constraint):
   # TODO: Expand how we parse table format constraints to support syntax such 
as
   # a table format string with a wildcard character. Right now we don't do 
anything.
   return table_format_constraint
 
+
 def parse_test_file(test_file_name, valid_section_names, 
skip_unknown_sections=True,
     encoding=None):
   """
@@ -169,6 +176,24 @@ def parse_test_file(test_file_name, valid_section_names, 
skip_unknown_sections=T
     return parse_test_file_text(file_data, valid_section_names,
                                 skip_unknown_sections)
 
+
+def parse_runtime_profile_table_formats(subsection_comment):
+  prefix = "table_format="
+  if not subsection_comment.startswith(prefix):
+    raise RuntimeError('RUNTIME_PROFILE comment (%s) must be of the form '
+                       '"table_format=FORMAT[,FORMAT2,...]"' % 
subsection_comment)
+
+  parsed_formats = subsection_comment[len(prefix):].split(',')
+  table_formats = list()
+  for table_format in parsed_formats:
+    if table_format not in ALLOWED_TABLE_FORMATS:
+      raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: %s' %
+                         (table_format, ALLOWED_TABLE_FORMATS))
+    else:
+      table_formats.append(table_format)
+  return table_formats
+
+
 def parse_test_file_text(text, valid_section_names, 
skip_unknown_sections=True):
   sections = list()
   section_start_regex = re.compile(r'(?m)^%s' % SECTION_DELIMITER)
@@ -199,8 +224,11 @@ def parse_test_file_text(text, valid_section_names, 
skip_unknown_sections=True):
       subsection_comment = None
 
       subsection_info = [s.strip() for s in subsection_name.split(':')]
-      if(len(subsection_info) == 2):
+      if len(subsection_info) == 2:
         subsection_name, subsection_comment = subsection_info
+        if subsection_comment == "":
+          # ignore empty subsection_comment
+          subsection_comment = None
 
       lines_content = lines[1:-1]
 
@@ -237,7 +265,7 @@ def parse_test_file_text(text, valid_section_names, 
skip_unknown_sections=True):
 
       if subsection_name == 'CATCH':
         parsed_sections['CATCH'] = list()
-        if subsection_comment == None:
+        if subsection_comment is None:
           parsed_sections['CATCH'].append(subsection_str)
         elif subsection_comment == 'ANY_OF':
           parsed_sections['CATCH'].extend(lines_content)
@@ -254,28 +282,24 @@ def parse_test_file_text(text, valid_section_names, 
skip_unknown_sections=True):
       # will be verified against the DML_RESULTS. Using both DML_RESULTS and 
RESULTS is
       # not supported.
       if subsection_name == 'DML_RESULTS':
-        if subsection_comment is None or subsection_comment == '':
-          raise RuntimeError('DML_RESULTS requires that the table is specified 
' \
+        if subsection_comment is None:
+          raise RuntimeError('DML_RESULTS requires that the table is specified 
'
               'in the comment.')
         parsed_sections['DML_RESULTS_TABLE'] = subsection_comment
         parsed_sections['VERIFIER'] = 'VERIFY_IS_EQUAL_SORTED'
 
       # The RUNTIME_PROFILE section is used to specify lines of text that 
should be
       # present in the query runtime profile. It takes an option comment 
containing a
-      # table format. RUNTIME_PROFILE secions with a comment are only 
evaluated for the
+      # table format. RUNTIME_PROFILE sections with a comment are only 
evaluated for the
       # specified format. If there is a RUNTIME_PROFILE section without a 
comment, it is
       # evaluated for all formats that don't have a commented section for this 
query.
       if subsection_name == 'RUNTIME_PROFILE':
-        if subsection_comment is not None and subsection_comment is not "":
-          allowed_formats = ['kudu']
-          if not subsection_comment.startswith("table_format="):
-            raise RuntimeError('RUNTIME_PROFILE comment (%s) must be of the 
form '
-              '"table_format=FORMAT"' % subsection_comment)
-          table_format = subsection_comment[13:]
-          if table_format not in allowed_formats:
-            raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: 
%s' %
-                (table_format, allowed_formats))
-          subsection_name = 'RUNTIME_PROFILE_%s' % table_format
+        if subsection_comment:
+          table_formats = 
parse_runtime_profile_table_formats(subsection_comment)
+          for table_format in table_formats:
+            subsection_name_for_format = 'RUNTIME_PROFILE_%s' % table_format
+            parsed_sections[subsection_name_for_format] = subsection_str
+          continue
 
       parsed_sections[subsection_name] = subsection_str
 
@@ -283,6 +307,7 @@ def parse_test_file_text(text, valid_section_names, 
skip_unknown_sections=True):
       sections.append(parsed_sections)
   return sections
 
+
 def split_section_lines(section_str):
   """
   Given a section string as produced by parse_test_file_text(), split it into 
separate
@@ -294,6 +319,7 @@ def split_section_lines(section_str):
   # Trim off the trailing newline and split into lines.
   return section_str[:-1].split('\n')
 
+
 def join_section_lines(lines):
   """
   The inverse of split_section_lines().
@@ -303,6 +329,7 @@ def join_section_lines(lines):
   """
   return '\n'.join(lines) + '\n'
 
+
 def write_test_file(test_file_name, test_file_sections, encoding=None):
   """
   Given a list of test file sections, write out the corresponding test file

Reply via email to