This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 15e471563d22ca82fea3c435a98f859bfaf99d6a Author: Riza Suminto <[email protected]> AuthorDate: Sun May 7 14:27:04 2023 -0700 IMPALA-11123: Reimplement ORC optimized count star Commit 7ca20b3c94b1c9c1ddd4ed1e89f0969a0df55330 revert the original optimized count(star) for ORC scan from commit f932d78ad0a30e322d59fc39072f710f889d2135 (gerrit review http://gerrit.cloudera.org:8080/18327). The revert is necessary since the unification of count star and zero slot functions into HdfsColumnarScanner and causing significant regression for non-optimized counts star query in parquet format (over 15% slower MaterializeTupleTime). This patch reimplements optimized count(star) for ORC scan code path while minimizing the code changes needed for parquet scan code path. After this patch, ORC and parquet code path will have only the following new things in common: - THdfsScanNode.count_star_slot_offset renamed to THdfsScanNode.star_slot_offset - HdfsScanner::IssueFooterRanges will only issue footer ranges if IsZeroSlotTableScan() or optimize_count_star() is true (made possible for parquet by IMPALA-12631). The structure of HdfsParquetScanner::GetNextInternal() remains unchanged. Its zero scan slot code path is still served through num_rows metadata from the parquet footer, while the optimized count star code path still loops over row groups metadata (also from parquet footer). The following table shows single-node benchmark result of 3 count query variant on TPC-DS scale 10, both in ORC and parquet format, looped 9 times. +-----------+---------------------------+---------+--------+-------------+------------+ | Workload | Query | Format | Avg(s) | Base Avg(s) | Delta(Avg) | +-----------+---------------------------+---------+--------+-------------+------------+ | TPCDS(10) | TPCDS-Q_COUNT_UNOPTIMIZED | orc | 0.30 | 0.28 | +6.50% | | TPCDS(10) | TPCDS-Q_COUNT_OPTIMIZED | parquet | 0.14 | 0.14 | +1.56% | | TPCDS(10) | TPCDS-Q_COUNT_ZERO_SLOT | parquet | 0.27 | 0.27 | +1.42% | | TPCDS(10) | TPCDS-Q_COUNT_ZERO_SLOT | orc | 0.28 | 0.29 | -3.03% | | TPCDS(10) | TPCDS-Q_COUNT_UNOPTIMIZED | parquet | 0.21 | 0.22 | -4.45% | | TPCDS(10) | TPCDS-Q_COUNT_OPTIMIZED | orc | 0.14 | 0.21 | I -35.92% | +-----------+---------------------------+---------+--------+-------------+------------+ Testing: - Restore PlannerTest.testOrcStatsAgg - Restore TestAggregationQueriesRunOnce and TestAggregationQueriesRunOnce::test_orc_count_star_optimization - Exercise count(star) in TestOrc::test_misaligned_orc_stripes - Pass core tests Change-Id: I5971c8f278e1dee44e2a8dd4d2f043d22ebf5d17 Reviewed-on: http://gerrit.cloudera.org:8080/19927 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-scan-node-base.cc | 5 +- be/src/exec/hdfs-scan-node-base.h | 12 +- be/src/exec/hdfs-scanner.cc | 4 +- be/src/exec/orc/hdfs-orc-scanner.cc | 45 ++- be/src/exec/orc/hdfs-orc-scanner.h | 1 - be/src/exec/parquet/hdfs-parquet-scanner.cc | 11 +- be/src/exec/parquet/hdfs-parquet-scanner.h | 2 - common/thrift/PlanNodes.thrift | 5 +- .../org/apache/impala/planner/HdfsScanNode.java | 13 +- .../org/apache/impala/planner/PlannerTest.java | 5 + .../queries/PlannerTest/orc-stats-agg.test | 439 +++++++++++++++++++++ .../queries/QueryTest/orc-stats-agg.test | 164 ++++++++ .../queries/QueryTest/partition-key-scans.test | 21 + .../queries/QueryTest/scanners.test | 3 + tests/query_test/test_aggregation.py | 102 ++--- tests/query_test/test_scanners.py | 12 +- tests/util/test_file_parser.py | 63 ++- 17 files changed, 809 insertions(+), 98 deletions(-) diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index c1ceb5c66..4ccd656a6 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -451,9 +451,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pno hdfs_scan_node.skip_header_line_count : 0), tuple_id_(pnode.tuple_id_), - parquet_count_star_slot_offset_( - hdfs_scan_node.__isset.parquet_count_star_slot_offset ? - hdfs_scan_node.parquet_count_star_slot_offset : + count_star_slot_offset_(hdfs_scan_node.__isset.count_star_slot_offset ? + hdfs_scan_node.count_star_slot_offset : -1), is_partition_key_scan_(hdfs_scan_node.is_partition_key_scan), tuple_desc_(pnode.tuple_desc_), diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 160d4d782..0517b1f3c 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -463,10 +463,8 @@ class HdfsScanNodeBase : public ScanNode { const AvroSchemaElement& avro_schema() const { return avro_schema_; } int skip_header_line_count() const { return skip_header_line_count_; } io::RequestContext* reader_context() const { return reader_context_.get(); } - bool optimize_parquet_count_star() const { - return parquet_count_star_slot_offset_ != -1; - } - int parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; } + bool optimize_count_star() const { return count_star_slot_offset_ != -1; } + int count_star_slot_offset() const { return count_star_slot_offset_; } bool is_partition_key_scan() const { return is_partition_key_scan_; } typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>> @@ -686,11 +684,11 @@ class HdfsScanNodeBase : public ScanNode { /// Tuple id of the tuple descriptor to be used. const int tuple_id_; - /// The byte offset of the slot for Parquet metadata if Parquet count star optimization + /// The byte offset of the slot for Parquet/ORC metadata if count star optimization /// is enabled. When set, this scan node can optimize a count(*) query by populating /// the tuple with data from the Parquet num rows statistic. See - /// applyParquetCountStartOptimization() in HdfsScanNode.java. - const int parquet_count_star_slot_offset_; + /// applyCountStarOptimization() in ScanNode.java. + const int count_star_slot_offset_; // True if this is a partition key scan that needs only to return at least one row from // each scan range. If true, the scan node and scanner implementations should attempt diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index e17b8d183..ff83ab616 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -1013,6 +1013,8 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node, // Try to find the split with the footer. ScanRange* footer_split = FindFooterSplit(files[i]); + bool footer_scanner = + scan_node->IsZeroSlotTableScan() || scan_node->optimize_count_star(); for (int j = 0; j < files[i]->splits.size(); ++j) { ScanRange* split = files[i]->splits[j]; @@ -1023,7 +1025,7 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node, // groups. We only want a single node to process the file footer in this case, // which is the node with the footer split. If it's not a count(*), we create a // footer range for the split always. - if (!scan_node->IsZeroSlotTableScan() || footer_split == split) { + if (!footer_scanner || footer_split == split) { ScanRangeMetadata* split_metadata = static_cast<ScanRangeMetadata*>(split->meta_data()); // Each split is processed by first issuing a scan range for the file footer, which diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc index 6c8cf7822..64369ae97 100644 --- a/be/src/exec/orc/hdfs-orc-scanner.cc +++ b/be/src/exec/orc/hdfs-orc-scanner.cc @@ -406,9 +406,10 @@ Status HdfsOrcScanner::Open(ScannerContext* context) { row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME; } - if (UNLIKELY(scan_node_->optimize_parquet_count_star())) { - DCHECK(false); - return Status("Internal ERROR: ORC scanner cannot optimize count star slot."); + if (scan_node_->optimize_count_star()) { + DCHECK(!row_batches_need_validation_); + template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()]; + return Status::OK(); } // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip @@ -779,10 +780,39 @@ Status HdfsOrcScanner::ProcessSplit() { } Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) { - // In case 'row_batches_need_validation_' is true, we need to look at the row - // batches and check their validity. In that case 'currentTransaction' is the only - // selected field from the file (in case of zero slot scans). - if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) { + if (scan_node_->optimize_count_star()) { + // There are no materialized slots, e.g. count(*) over the table. We can serve + // this query from just the file metadata. We don't need to read the column data. + // Only scanner of the footer split will run in this case. See the logic in + // HdfsScanner::IssueFooterRanges() and HdfsScanNodeBase::ReadsFileMetadataOnly(). + DCHECK(!row_batches_need_validation_); + DCHECK(is_footer_scanner_); + uint64_t file_rows = reader_->getNumberOfRows(); + DCHECK_LT(stripe_rows_read_, file_rows); + COUNTER_ADD(num_file_metadata_read_, 1); + int64_t tuple_buffer_size; + uint8_t* tuple_buffer; + int capacity = 1; + RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state_, + row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), &capacity, + &tuple_buffer_size, &tuple_buffer)); + Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buffer); + InitTuple(template_tuple_, dst_tuple); + int64_t* dst_slot = dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset()); + *dst_slot = file_rows; + TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow()); + dst_row->SetTuple(0, dst_tuple); + row_batch->CommitLastRow(); + stripe_rows_read_ += file_rows; + COUNTER_ADD(scan_node_->rows_read_counter(), file_rows); + eos_ = true; + return Status::OK(); + } else if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) { + // In case 'row_batches_need_validation_' is true, we need to look at the row + // batches and check their validity. In that case 'currentTransaction' is the only + // selected field from the file (in case of zero slot scans). + // This block only handle case when 'row_batches_need_validation_' is false. + DCHECK(is_footer_scanner_); uint64_t file_rows = reader_->getNumberOfRows(); // There are no materialized slots, e.g. count(*) over the table. We can serve // this query from just the file metadata. We don't need to read the column data. @@ -790,6 +820,7 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) { eos_ = true; return Status::OK(); } + DCHECK_LT(stripe_rows_read_, file_rows); COUNTER_ADD(num_file_metadata_read_, 1); assemble_rows_timer_.Start(); DCHECK_LT(stripe_rows_read_, file_rows); diff --git a/be/src/exec/orc/hdfs-orc-scanner.h b/be/src/exec/orc/hdfs-orc-scanner.h index bf52787bc..a066105de 100644 --- a/be/src/exec/orc/hdfs-orc-scanner.h +++ b/be/src/exec/orc/hdfs-orc-scanner.h @@ -195,7 +195,6 @@ class HdfsOrcScanner : public HdfsColumnarScanner { friend class OrcStructReader; friend class OrcListReader; friend class OrcMapReader; - friend class HdfsOrcScannerTest; /// Memory guard of the tuple_mem_ uint8_t* tuple_mem_end_ = nullptr; diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index 6d41dc147..29df05da0 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -383,7 +383,7 @@ static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group, int HdfsParquetScanner::CountScalarColumns( const vector<ParquetColumnReader*>& column_readers) { - DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star()); + DCHECK(!column_readers.empty() || scan_node_->optimize_count_star()); int num_columns = 0; stack<ParquetColumnReader*> readers; for (ParquetColumnReader* r: column_readers_) readers.push(r); @@ -433,7 +433,8 @@ Status HdfsParquetScanner::ProcessSplit() { Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { DCHECK(parse_status_.ok()) << parse_status_.GetDetail(); - if (scan_node_->optimize_parquet_count_star()) { + if (scan_node_->optimize_count_star()) { + // This is an optimized count(*) case. // Populate the single slot with the Parquet num rows statistic. DCHECK(is_footer_scanner_); int64_t tuple_buf_size; @@ -447,8 +448,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf); TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow()); InitTuple(template_tuple_, dst_tuple); - int64_t* dst_slot = - dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset()); + int64_t* dst_slot = dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset()); *dst_slot = 0; for (const auto &row_group : file_metadata_.row_groups) { *dst_slot += row_group.num_rows; @@ -460,6 +460,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { eos_ = true; return Status::OK(); } else if (scan_node_->IsZeroSlotTableScan()) { + DCHECK(is_footer_scanner_); // There are no materialized slots and we are not optimizing count(*), e.g. // "select 1 from alltypes". We can serve this query from just the file metadata. // We don't need to read the column data. @@ -2836,7 +2837,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc DCHECK(column_readers != nullptr); DCHECK(column_readers->empty()); - if (scan_node_->optimize_parquet_count_star()) { + if (scan_node_->optimize_count_star()) { // Column readers are not needed because we are not reading from any columns if this // optimization is enabled. return Status::OK(); diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h index 5f5c038da..c09aa6a48 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.h +++ b/be/src/exec/parquet/hdfs-parquet-scanner.h @@ -50,7 +50,6 @@ class ParquetColumnReader; class ParquetPageReader; template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> class ScalarColumnReader; -class BoolColumnReader; /// This scanner parses Parquet files located in HDFS, and writes the content as tuples in /// the Impala in-memory representation of data, e.g. (tuples, rows, row batches). @@ -400,7 +399,6 @@ class HdfsParquetScanner : public HdfsColumnarScanner { friend class BaseScalarColumnReader; template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> friend class ScalarColumnReader; - friend class BoolColumnReader; friend class HdfsParquetScannerTest; friend class ParquetPageIndex; friend class ParquetColumnChunkReader; diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index cfee32a5a..1c7333571 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -329,9 +329,8 @@ struct THdfsScanNode { // The conjuncts that are eligible for dictionary filtering. 9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts - // The byte offset of the slot for Parquet metadata if Parquet count star optimization - // is enabled. - 10: optional i32 parquet_count_star_slot_offset + // The byte offset of the slot for counter if count star optimization is enabled. + 10: optional i32 count_star_slot_offset // If true, the backend only needs to return one row per partition. 11: optional bool is_partition_key_scan diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 60e4abc04..6dc5590f9 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -91,6 +91,7 @@ import org.apache.impala.thrift.TScanRangeLocationList; import org.apache.impala.thrift.TScanRangeSpec; import org.apache.impala.thrift.TSortingOrder; import org.apache.impala.thrift.TTableStats; +import org.apache.impala.util.AcidUtils; import org.apache.impala.util.BitUtil; import org.apache.impala.util.ExecutorMembershipSnapshot; import org.apache.impala.util.MathUtil; @@ -339,6 +340,8 @@ public class HdfsScanNode extends ScanNode { // Used only to display EXPLAIN information. private final List<Expr> partitionConjuncts_; + private boolean isFullAcidTable_ = false; + /** * Construct a node to scan given data files into tuples described by 'desc', * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and @@ -359,6 +362,8 @@ public class HdfsScanNode extends ScanNode { tableNumRowsHint_ = hdfsTblRef.getTableNumRowsHint(); FeFsTable hdfsTable = (FeFsTable)hdfsTblRef.getTable(); Preconditions.checkState(tbl_ == hdfsTable); + isFullAcidTable_ = + AcidUtils.isFullAcidTable(hdfsTable.getMetaStoreTable().getParameters()); StringBuilder error = new StringBuilder(); aggInfo_ = aggInfo; skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error); @@ -403,13 +408,14 @@ public class HdfsScanNode extends ScanNode { } /** - * Returns true if the Parquet count(*) optimization can be applied to the query block + * Returns true if the count(*) optimization can be applied to the query block * of this scan node. */ protected boolean canApplyCountStarOptimization(Analyzer analyzer, Set<HdfsFileFormat> fileFormats) { if (fileFormats.size() != 1) return false; - if (!hasParquet(fileFormats)) return false; + if (isFullAcidTable_) return false; + if (!hasParquet(fileFormats) && !hasOrc(fileFormats)) return false; return canApplyCountStarOptimization(analyzer); } @@ -1896,8 +1902,7 @@ public class HdfsScanNode extends ScanNode { msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_); Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null)); if (countStarSlot_ != null) { - msg.hdfs_scan_node.setParquet_count_star_slot_offset( - countStarSlot_.getByteOffset()); + msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset()); } if (!statsConjuncts_.isEmpty()) { for (Expr e: statsConjuncts_) { diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index b28d489eb..7449a1697 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -1390,6 +1390,11 @@ public class PlannerTest extends PlannerTestBase { runPlannerTestFile("tpcds-dist-method", "tpcds"); } + @Test + public void testOrcStatsAgg() { + runPlannerTestFile("orc-stats-agg"); + } + /** * Test new hint of 'TABLE_NUM_ROWS' */ diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test new file mode 100644 index 000000000..75265a0b0 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test @@ -0,0 +1,439 @@ +# Verify that that the ORC count(*) optimization is applied in all count(*) or +# count(<literal>) cases when scanning a ORC table. In the last case, we are scanning +# a text table, so the optimization is not applied. The optimization is observed when +# the cardinality of the ORC scan (24) is the same as # the # of files (24). +select count(*) from functional_orc_def.uncomp_src_alltypes +union all +select count(1) from functional_orc_def.uncomp_src_alltypes +union all +select count(123) from functional_orc_def.uncomp_src_alltypes +union all +select count(*) from functional.alltypes +---- PLAN +PLAN-ROOT SINK +| +00:UNION +| pass-through-operands: all +| row-size=8B cardinality=4 +| +|--08:AGGREGATE [FINALIZE] +| | output: count(*) +| | row-size=8B cardinality=1 +| | +| 07:SCAN HDFS [functional.alltypes] +| HDFS partitions=24/24 files=24 size=478.45KB +| row-size=0B cardinality=7.30K +| +|--06:AGGREGATE [FINALIZE] +| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| | row-size=8B cardinality=1 +| | +| 05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] +| HDFS partitions=24/24 files=24 size=205.47KB +| row-size=4B cardinality=24 +| +|--04:AGGREGATE [FINALIZE] +| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| | row-size=8B cardinality=1 +| | +| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] +| HDFS partitions=24/24 files=24 size=205.47KB +| row-size=4B cardinality=24 +| +02:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| row-size=8B cardinality=1 +| +01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=4B cardinality=24 +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:UNION +| pass-through-operands: all +| row-size=8B cardinality=4 +| +|--16:AGGREGATE [FINALIZE] +| | output: count:merge(*) +| | row-size=8B cardinality=1 +| | +| 15:EXCHANGE [UNPARTITIONED] +| | +| 08:AGGREGATE +| | output: count(*) +| | row-size=8B cardinality=1 +| | +| 07:SCAN HDFS [functional.alltypes] +| HDFS partitions=24/24 files=24 size=478.45KB +| row-size=0B cardinality=7.30K +| +|--14:AGGREGATE [FINALIZE] +| | output: count:merge(*) +| | row-size=8B cardinality=1 +| | +| 13:EXCHANGE [UNPARTITIONED] +| | +| 06:AGGREGATE +| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| | row-size=8B cardinality=1 +| | +| 05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] +| HDFS partitions=24/24 files=24 size=205.47KB +| row-size=4B cardinality=24 +| +|--12:AGGREGATE [FINALIZE] +| | output: count:merge(*) +| | row-size=8B cardinality=1 +| | +| 11:EXCHANGE [UNPARTITIONED] +| | +| 04:AGGREGATE +| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| | row-size=8B cardinality=1 +| | +| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] +| HDFS partitions=24/24 files=24 size=205.47KB +| row-size=4B cardinality=24 +| +10:AGGREGATE [FINALIZE] +| output: count:merge(*) +| row-size=8B cardinality=1 +| +09:EXCHANGE [UNPARTITIONED] +| +02:AGGREGATE +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| row-size=8B cardinality=1 +| +01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=4B cardinality=24 +==== +# Verify that the ORC count(*) optimization is applied even if there is more than +# one item in the select list. +select count(*), count(1), count(123) from functional_orc_def.uncomp_src_alltypes +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| row-size=8B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=4B cardinality=24 +==== +# Select count(<partition col>) - the optimization is disabled because it's not a +# count(<literal>) or count(*) aggregate function. +select count(year) from functional_orc_def.uncomp_src_alltypes +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(`year`) +| row-size=8B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=4B cardinality=13.07K +==== +# Group by partition columns. +select month, count(*) from functional_orc_def.uncomp_src_alltypes group by month, year +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| group by: `month`, `year` +| row-size=16B cardinality=24 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=16B cardinality=24 +==== +# The optimization is disabled because tinyint_col is not a partition col. +select tinyint_col, count(*) from functional_orc_def.uncomp_src_alltypes group by tinyint_col, year +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| group by: tinyint_col, `year` +| row-size=13B cardinality=13.07K +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=5B cardinality=13.07K +==== +# The optimization is disabled because it can not be applied to the 1st aggregate +# function. +select avg(year), count(*) from functional_orc_def.uncomp_src_alltypes +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: avg(`year`), count(*) +| row-size=16B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=4B cardinality=13.07K +==== +# Optimization is not applied because the inner count(*) is not materialized. The outer +# count(*) does not reference a base table. +select count(*) from (select count(*) from functional_orc_def.uncomp_src_alltypes) t +---- PLAN +PLAN-ROOT SINK +| +02:AGGREGATE [FINALIZE] +| output: count(*) +| row-size=8B cardinality=1 +| +01:AGGREGATE [FINALIZE] +| row-size=0B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + partition key scan + row-size=0B cardinality=24 +==== +# The optimization is applied if count(*) is in the having clause. +select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1 +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| having: count(*) > 1 +| row-size=8B cardinality=0 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=4B cardinality=24 +==== +# The count(*) optimization is applied in the inline view. +select count(*), count(a) from (select count(1) as a from functional_orc_def.uncomp_src_alltypes) t +---- PLAN +PLAN-ROOT SINK +| +02:AGGREGATE [FINALIZE] +| output: count(*), count(count(*)) +| row-size=16B cardinality=1 +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| row-size=8B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=4B cardinality=24 +==== +# The count(*) optimization is applied to the inline view even if there is a join. +select * +from functional.alltypes x inner join ( + select count(1) as a from functional_orc_def.uncomp_src_alltypes group by year +) t on x.id = t.a; +---- PLAN +PLAN-ROOT SINK +| +03:HASH JOIN [INNER JOIN] +| hash predicates: x.id = count(*) +| runtime filters: RF000 <- count(*) +| row-size=101B cardinality=2 +| +|--02:AGGREGATE [FINALIZE] +| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| | group by: `year` +| | row-size=12B cardinality=2 +| | +| 01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] +| HDFS partitions=24/24 files=24 size=205.47KB +| row-size=12B cardinality=24 +| +00:SCAN HDFS [functional.alltypes x] + HDFS partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> x.id + row-size=89B cardinality=7.30K +==== +# The count(*) optimization is not applied if there is more than 1 table ref. +select count(*) from functional_orc_def.uncomp_src_alltypes a, functional_orc_def.uncomp_src_alltypes b +---- PLAN +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: count(*) +| row-size=8B cardinality=1 +| +02:NESTED LOOP JOIN [CROSS JOIN] +| row-size=0B cardinality=170.85M +| +|--01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes b] +| HDFS partitions=24/24 files=24 size=205.47KB +| row-size=0B cardinality=13.07K +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes a] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=0B cardinality=13.07K +==== +# The count(*) optimization is applied if all predicates are on partition columns only. +select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8; +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| row-size=8B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + partition predicates: `year` < 2010, `month` > 8 + HDFS partitions=4/24 files=4 size=33.53KB + row-size=8B cardinality=4 +==== +# tinyint_col is not a partition column so the optimization is disabled. +select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 and tinyint_col > 8; +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| row-size=8B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + partition predicates: `year` < 2010 + HDFS partitions=12/24 files=12 size=102.74KB + predicates: tinyint_col > 8 + row-size=1B cardinality=654 +==== +# Optimization is applied after constant folding. +select count(1 + 2 + 3) from functional_orc_def.uncomp_src_alltypes +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| row-size=8B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=4B cardinality=24 +==== +# Optimization is not applied to count(null). +select count(1 + null + 3) from functional_orc_def.uncomp_src_alltypes +union all +select count(null) from functional_orc_def.uncomp_src_alltypes +---- PLAN +PLAN-ROOT SINK +| +00:UNION +| pass-through-operands: all +| row-size=8B cardinality=2 +| +|--04:AGGREGATE [FINALIZE] +| | output: count(NULL) +| | row-size=8B cardinality=1 +| | +| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] +| HDFS partitions=24/24 files=24 size=205.47KB +| row-size=0B cardinality=13.07K +| +02:AGGREGATE [FINALIZE] +| output: count(NULL + 3) +| row-size=8B cardinality=1 +| +01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=0B cardinality=13.07K +==== +# Optimization is not applied when selecting from an empty table. +select count(*) from functional_orc_def.emptytable +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| row-size=8B cardinality=0 +| +00:SCAN HDFS [functional_orc_def.emptytable] + partitions=0/0 files=0 size=0B + row-size=0B cardinality=0 +==== +# Optimization is not applied when all partitions are pruned. +select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1 +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| row-size=8B cardinality=0 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + partition predicates: `year` = -1 + partitions=0/24 files=0 size=0B + row-size=0B cardinality=0 +==== +# Optimization is not applied across query blocks, even though it would be correct here. +select count(*) from (select int_col from functional_orc_def.uncomp_src_alltypes) t +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| row-size=8B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=0B cardinality=13.07K +==== +# In general, optimization is not applied when there is a distinct agg. +select count(*), count(distinct 1) from functional_orc_def.uncomp_src_alltypes +---- PLAN +PLAN-ROOT SINK +| +02:AGGREGATE [FINALIZE] +| output: count(1), count:merge(*) +| row-size=16B cardinality=1 +| +01:AGGREGATE +| output: count(*) +| group by: 1 +| row-size=9B cardinality=1 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + HDFS partitions=24/24 files=24 size=205.47KB + row-size=0B cardinality=13.07K +==== +# The optimization is applied here because only the count(*) and a partition column are +# materialized. Non-materialized agg exprs are ignored. +select year, cnt from ( + select year, count(bigint_col), count(*) cnt, avg(int_col) + from functional_orc_def.uncomp_src_alltypes + where month=1 + group by year +) t +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) +| group by: `year` +| row-size=12B cardinality=2 +| +00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] + partition predicates: `month` = 1 + HDFS partitions=2/24 files=2 size=17.07KB + row-size=12B cardinality=2 +==== +# Optimization is not applied when selecting from a full acid table. +select count(*) from functional_orc_def.complextypestbl +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| row-size=8B cardinality=0 +| +00:SCAN HDFS [functional_orc_def.complextypestbl] + HDFS partitions=1/1 files=2 size=4.04KB + row-size=0B cardinality=2.57K +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test b/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test new file mode 100644 index 000000000..db7656cde --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test @@ -0,0 +1,164 @@ +==== +---- QUERY +# Tests the correctness of the ORC count(*) optimization. +select count(1) +from functional_orc_def.uncomp_src_alltypes +---- RESULTS +7300 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, NumFileMetadataRead): 24 +aggregation(SUM, RowsRead): 7300 +===== +---- QUERY +# Tests the correctness of zero slot scan over ORC. +# Does not verify 'NumFileMetadataRead' here since codegen vs non-codegen yield +# different number. +select 1 from functional_orc_def.alltypestiny +---- RESULTS +1 +1 +1 +1 +1 +1 +1 +1 +---- TYPES +tinyint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, RowsRead): 8 +===== +---- QUERY +# ORC count(*) optimization with predicates on the partition columns. +select count(1) +from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8 +---- RESULTS +1220 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, NumFileMetadataRead): 4 +aggregation(SUM, RowsRead): 1220 +===== +---- QUERY +# ORC count(*) optimization with group by partition columns. +select year, month, count(1) +from functional_orc_def.uncomp_src_alltypes group by year, month +---- RESULTS +2009,1,310 +2009,2,280 +2009,3,310 +2009,4,300 +2009,5,310 +2009,6,300 +2009,7,310 +2009,8,310 +2009,9,300 +2009,10,310 +2009,11,300 +2009,12,310 +2010,1,310 +2010,2,280 +2010,3,310 +2010,4,300 +2010,5,310 +2010,6,300 +2010,7,310 +2010,8,310 +2010,9,300 +2010,10,310 +2010,11,300 +2010,12,310 +---- TYPES +int, int, bigint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, NumFileMetadataRead): 24 +aggregation(SUM, RowsRead): 7300 +===== +---- QUERY +# ORC count(*) optimization with both group by and predicates on partition columns. +select count(1) +from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8 +group by month +---- RESULTS +310 +300 +310 +300 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, NumFileMetadataRead): 4 +aggregation(SUM, RowsRead): 1220 +===== +---- QUERY +# ORC count(*) optimization with the result going into a join. +select x.bigint_col from functional_orc_def.uncomp_src_alltypes x + inner join ( + select count(1) as a from functional_orc_def.uncomp_src_alltypes group by year + ) t on x.id = t.a; +---- RESULTS +0 +0 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 24 +aggregation(SUM, NumFileMetadataRead): 24 +aggregation(SUM, RowsRead): 14600 +===== +---- QUERY +# ORC count(*) optimization with the agg function in the having clause. +select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1 +---- RESULTS +1 +---- TYPES +tinyint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, NumFileMetadataRead): 24 +aggregation(SUM, RowsRead): 7300 +==== +---- QUERY +# Verify that 0 is returned for count(*) on an empty table. +select count(1) from functional_orc_def.emptytable +---- RESULTS +0 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, NumFileMetadataRead): 0 +aggregation(SUM, RowsRead): 0 +===== +---- QUERY +# Verify that 0 is returned when all partitions are pruned. +select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1 +---- RESULTS +0 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, NumFileMetadataRead): 0 +aggregation(SUM, RowsRead): 0 +===== +---- QUERY +# Verify count star over full acid table. +# NumFileMetadataRead is varied depending on DISABLE_CODEGEN option. +select count(*) from functional_orc_def.complextypestbl +---- RESULTS +8 +---- TYPES +bigint +---- RUNTIME_PROFILE +aggregation(SUM, NumOrcStripes): 0 +aggregation(SUM, RowsRead): 8 +===== diff --git a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test index 0d9e17330..3e5134378 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test +++ b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test @@ -11,6 +11,9 @@ INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 +---- RUNTIME_PROFILE: table_format=parquet,orc +aggregation(SUM, RowsRead): 24 +aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Test with more complex multiple distinct aggregation. @@ -23,6 +26,9 @@ BIGINT,BIGINT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 +---- RUNTIME_PROFILE: table_format=parquet,orc +aggregation(SUM, RowsRead): 24 +aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Distinct aggregation with multiple columns. @@ -58,6 +64,9 @@ INT,INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 +---- RUNTIME_PROFILE: table_format=parquet,orc +aggregation(SUM, RowsRead): 24 +aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Partition key scan combined with analytic function. @@ -71,6 +80,9 @@ INT,BIGINT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 +---- RUNTIME_PROFILE: table_format=parquet,orc +aggregation(SUM, RowsRead): 24 +aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Partition scan combined with sort. @@ -107,6 +119,9 @@ INT,INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 +---- RUNTIME_PROFILE: table_format=parquet,orc +aggregation(SUM, RowsRead): 24 +aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Partition key scan combined with predicate on partition columns @@ -121,6 +136,9 @@ INT,INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 2 +---- RUNTIME_PROFILE: table_format=parquet,orc +aggregation(SUM, RowsRead): 2 +aggregation(SUM, NumFileMetadataRead): 2 ==== ---- QUERY # Partition key scan combined with having predicate. @@ -136,6 +154,9 @@ INT,INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 +---- RUNTIME_PROFILE: table_format=parquet,orc +aggregation(SUM, RowsRead): 24 +aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Empty table should not return any rows diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners.test b/testdata/workloads/functional-query/queries/QueryTest/scanners.test index eff43d53d..6f58d56de 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test +++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test @@ -238,6 +238,9 @@ tinyint 1 ---- RUNTIME_PROFILE aggregation(SUM, RowsRead): 100 +---- RUNTIME_PROFILE: table_format=parquet,orc +aggregation(SUM, RowsRead): 100 +aggregation(SUM, RowsReturned): 200 ==== ---- QUERY select year, count(*) from alltypes group by year diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py index c7498ec06..aa7cfdb35 100644 --- a/tests/query_test/test_aggregation.py +++ b/tests/query_test/test_aggregation.py @@ -261,24 +261,6 @@ class TestAggregationQueries(ImpalaTestSuite): # Verify codegen was enabled for all four stages of the aggregation. assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6]) - def test_parquet_count_star_optimization(self, vector, unique_database): - if (vector.get_value('table_format').file_format != 'text' or - vector.get_value('table_format').compression_codec != 'none'): - # No need to run this test on all file formats - pytest.skip() - self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database) - vector.get_value('exec_option')['batch_size'] = 1 - self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database) - - def test_kudu_count_star_optimization(self, vector, unique_database): - if (vector.get_value('table_format').file_format != 'text' or - vector.get_value('table_format').compression_codec != 'none'): - # No need to run this test on all file formats - pytest.skip() - self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database) - vector.get_value('exec_option')['batch_size'] = 1 - self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database) - def test_ndv(self): """Test the version of NDV() that accepts a scale value argument against different column data types. The scale argument is an integer in range @@ -322,17 +304,70 @@ class TestAggregationQueries(ImpalaTestSuite): for j in range(0, 11): assert(ndv_results[i - 1][j] == int(ndv_vals[j])) - def test_sampled_ndv(self, vector, unique_database): + def test_grouping_sets(self, vector): + """Tests for ROLLUP, CUBE and GROUPING SETS.""" + if vector.get_value('table_format').file_format == 'hbase': + pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent") + self.run_test_case('QueryTest/grouping-sets', vector) + + def test_aggregation_limit(self, vector): + """Test that limits are honoured when enforced by aggregation node.""" + # 1-phase + result = self.execute_query( + "select distinct l_orderkey from tpch.lineitem limit 10", + vector.get_value('exec_option')) + assert len(result.data) == 10 + + # 2-phase with transpose + result = self.execute_query( + "select count(distinct l_discount), group_concat(distinct l_linestatus), " + "max(l_quantity) from tpch.lineitem group by l_tax, l_shipmode limit 10;", + vector.get_value('exec_option')) + assert len(result.data) == 10 + + +class TestAggregationQueriesRunOnce(ImpalaTestSuite): + """Run the aggregation test suite similarly as TestAggregationQueries, but with stricter + constraint. Each test in this class only run once by setting uncompressed text dimension + for all exploration strategy. However, they may not necessarily target uncompressed text + table format. This also run with codegen enabled and disabled to exercise our + non-codegen code""" + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestAggregationQueriesRunOnce, cls).add_test_dimensions() + + cls.ImpalaTestMatrix.add_dimension( + create_exec_option_dimension(disable_codegen_options=[False, True])) + + cls.ImpalaTestMatrix.add_dimension( + create_uncompressed_text_dimension(cls.get_workload())) + + def test_parquet_count_star_optimization(self, vector, unique_database): + self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database) + vector.get_value('exec_option')['batch_size'] = 1 + self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database) + + def test_kudu_count_star_optimization(self, vector): + self.run_test_case('QueryTest/kudu-stats-agg', vector) + vector.get_value('exec_option')['batch_size'] = 1 + self.run_test_case('QueryTest/kudu-stats-agg', vector) + + def test_orc_count_star_optimization(self, vector): + self.run_test_case('QueryTest/orc-stats-agg', vector) + vector.get_value('exec_option')['batch_size'] = 1 + self.run_test_case('QueryTest/orc-stats-agg', vector) + + def test_sampled_ndv(self, vector): """The SAMPLED_NDV() function is inherently non-deterministic and cannot be reasonably made deterministic with existing options so we test it separately. The goal of this test is to ensure that SAMPLED_NDV() works on all data types and returns approximately sensible estimates. It is not the goal of this test to ensure tight error bounds on the NDV estimates. SAMPLED_NDV() is expected be inaccurate on small data sets like the ones we use in this test.""" - if (vector.get_value('table_format').file_format != 'text' or - vector.get_value('table_format').compression_codec != 'none'): - # No need to run this test on all file formats - pytest.skip() # NDV() is used a baseline to compare SAMPLED_NDV(). Both NDV() and SAMPLED_NDV() # are based on HyperLogLog so NDV() is roughly the best that SAMPLED_NDV() can do. @@ -386,27 +421,6 @@ class TestAggregationQueries(ImpalaTestSuite): for i in range(14, 16): self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0) - def test_grouping_sets(self, vector): - """Tests for ROLLUP, CUBE and GROUPING SETS.""" - if vector.get_value('table_format').file_format == 'hbase': - pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent") - self.run_test_case('QueryTest/grouping-sets', vector) - - def test_aggregation_limit(self, vector): - """Test that limits are honoured when enforced by aggregation node.""" - # 1-phase - result = self.execute_query( - "select distinct l_orderkey from tpch.lineitem limit 10", - vector.get_value('exec_option')) - assert len(result.data) == 10 - - # 2-phase with transpose - result = self.execute_query( - "select count(distinct l_discount), group_concat(distinct l_linestatus), " - "max(l_quantity) from tpch.lineitem group by l_tax, l_shipmode limit 10;", - vector.get_value('exec_option')) - assert len(result.data) == 10 - class TestDistinctAggregation(ImpalaTestSuite): """Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 88e9abf83..daa163504 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -1682,7 +1682,8 @@ class TestOrc(ImpalaTestSuite): def _misaligned_orc_stripes_helper( self, table_name, rows_in_table, num_scanners_with_no_reads=0): """Checks if 'num_scanners_with_no_reads' indicates the expected number of scanners - that don't read anything because the underlying file is poorly formatted + that don't read anything because the underlying file is poorly formatted. + Additionally, test that select count(star) match with expected number of rows. """ query = 'select * from %s' % table_name result = self.client.execute(query) @@ -1703,6 +1704,11 @@ class TestOrc(ImpalaTestSuite): total += int(n) assert total == num_scanners_with_no_reads + # Test that select count(star) match with expected number of rows. + query = 'select count(*) from %s' % table_name + result = self.client.execute(query) + assert int(result.data[0]) == rows_in_table + # Skip this test on non-HDFS filesystems, because orc-type-check.test contains Hive # queries that hang in some cases (IMPALA-9345). It would be possible to separate # the tests that use Hive and run most tests on S3, but I think that running these on @@ -1836,13 +1842,13 @@ class TestOrc(ImpalaTestSuite): "CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC", unique_database, test_name, test_files) err = self.execute_query_expect_failure(self.client, - "select count(*) from {0}.{1}".format(unique_database, test_name)) + "select count(id) from {0}.{1}".format(unique_database, test_name)) assert expected_error in str(err) def test_invalid_schema(self, vector, unique_database): """Test scanning of ORC file with malformed schema.""" self._run_invalid_schema_test(unique_database, "corrupt_schema", - "Encountered parse error during schema selection") + "Encountered parse error in tail of ORC file") self._run_invalid_schema_test(unique_database, "corrupt_root_type", "Root of the selected type returned by the ORC lib is not STRUCT: boolean.") diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py index a5ddb9670..8bd2973d4 100644 --- a/tests/util/test_file_parser.py +++ b/tests/util/test_file_parser.py @@ -34,6 +34,8 @@ LOG = logging.getLogger('impala_test_suite') # constants SECTION_DELIMITER = "====" SUBSECTION_DELIMITER = "----" +ALLOWED_TABLE_FORMATS = ['kudu', 'parquet', 'orc'] + # The QueryTestSectionReader provides utility functions that help to parse content # from a query test file @@ -73,16 +75,18 @@ class QueryTestSectionReader(object): elif table_format.compression_codec == 'none': suffix = '_%s' % (table_format.file_format) elif table_format.compression_type == 'record': - suffix = '_%s_record_%s' % (table_format.file_format, + suffix = '_%s_record_%s' % (table_format.file_format, table_format.compression_codec) else: - suffix = '_%s_%s' % (table_format.file_format, table_format.compression_codec) + suffix = '_%s_%s' % (table_format.file_format, table_format.compression_codec) dataset = table_format.dataset.replace('-', '') return dataset + scale_factor + suffix def remove_comments(section_text): - return '\n'.join([l for l in section_text.split('\n') if not l.strip().startswith('#')]) + lines = [line for line in section_text.split('\n') if not line.strip().startswith('#')] + return '\n'.join(lines) + def parse_query_test_file(file_name, valid_section_names=None, encoding=None): """ @@ -102,6 +106,7 @@ def parse_query_test_file(file_name, valid_section_names=None, encoding=None): return parse_test_file(file_name, section_names, encoding=encoding, skip_unknown_sections=False) + def parse_table_constraints(constraints_file): """Reads a table constraints file, if one exists""" schema_include = defaultdict(list) @@ -134,11 +139,13 @@ def parse_table_constraints(constraints_file): raise ValueError('Unknown constraint type: %s' % constraint_type) return schema_include, schema_exclude, schema_only + def parse_table_format_constraint(table_format_constraint): # TODO: Expand how we parse table format constraints to support syntax such as # a table format string with a wildcard character. Right now we don't do anything. return table_format_constraint + def parse_test_file(test_file_name, valid_section_names, skip_unknown_sections=True, encoding=None): """ @@ -169,6 +176,24 @@ def parse_test_file(test_file_name, valid_section_names, skip_unknown_sections=T return parse_test_file_text(file_data, valid_section_names, skip_unknown_sections) + +def parse_runtime_profile_table_formats(subsection_comment): + prefix = "table_format=" + if not subsection_comment.startswith(prefix): + raise RuntimeError('RUNTIME_PROFILE comment (%s) must be of the form ' + '"table_format=FORMAT[,FORMAT2,...]"' % subsection_comment) + + parsed_formats = subsection_comment[len(prefix):].split(',') + table_formats = list() + for table_format in parsed_formats: + if table_format not in ALLOWED_TABLE_FORMATS: + raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: %s' % + (table_format, ALLOWED_TABLE_FORMATS)) + else: + table_formats.append(table_format) + return table_formats + + def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True): sections = list() section_start_regex = re.compile(r'(?m)^%s' % SECTION_DELIMITER) @@ -199,8 +224,11 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True): subsection_comment = None subsection_info = [s.strip() for s in subsection_name.split(':')] - if(len(subsection_info) == 2): + if len(subsection_info) == 2: subsection_name, subsection_comment = subsection_info + if subsection_comment == "": + # ignore empty subsection_comment + subsection_comment = None lines_content = lines[1:-1] @@ -237,7 +265,7 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True): if subsection_name == 'CATCH': parsed_sections['CATCH'] = list() - if subsection_comment == None: + if subsection_comment is None: parsed_sections['CATCH'].append(subsection_str) elif subsection_comment == 'ANY_OF': parsed_sections['CATCH'].extend(lines_content) @@ -254,28 +282,24 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True): # will be verified against the DML_RESULTS. Using both DML_RESULTS and RESULTS is # not supported. if subsection_name == 'DML_RESULTS': - if subsection_comment is None or subsection_comment == '': - raise RuntimeError('DML_RESULTS requires that the table is specified ' \ + if subsection_comment is None: + raise RuntimeError('DML_RESULTS requires that the table is specified ' 'in the comment.') parsed_sections['DML_RESULTS_TABLE'] = subsection_comment parsed_sections['VERIFIER'] = 'VERIFY_IS_EQUAL_SORTED' # The RUNTIME_PROFILE section is used to specify lines of text that should be # present in the query runtime profile. It takes an option comment containing a - # table format. RUNTIME_PROFILE secions with a comment are only evaluated for the + # table format. RUNTIME_PROFILE sections with a comment are only evaluated for the # specified format. If there is a RUNTIME_PROFILE section without a comment, it is # evaluated for all formats that don't have a commented section for this query. if subsection_name == 'RUNTIME_PROFILE': - if subsection_comment is not None and subsection_comment is not "": - allowed_formats = ['kudu'] - if not subsection_comment.startswith("table_format="): - raise RuntimeError('RUNTIME_PROFILE comment (%s) must be of the form ' - '"table_format=FORMAT"' % subsection_comment) - table_format = subsection_comment[13:] - if table_format not in allowed_formats: - raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: %s' % - (table_format, allowed_formats)) - subsection_name = 'RUNTIME_PROFILE_%s' % table_format + if subsection_comment: + table_formats = parse_runtime_profile_table_formats(subsection_comment) + for table_format in table_formats: + subsection_name_for_format = 'RUNTIME_PROFILE_%s' % table_format + parsed_sections[subsection_name_for_format] = subsection_str + continue parsed_sections[subsection_name] = subsection_str @@ -283,6 +307,7 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True): sections.append(parsed_sections) return sections + def split_section_lines(section_str): """ Given a section string as produced by parse_test_file_text(), split it into separate @@ -294,6 +319,7 @@ def split_section_lines(section_str): # Trim off the trailing newline and split into lines. return section_str[:-1].split('\n') + def join_section_lines(lines): """ The inverse of split_section_lines(). @@ -303,6 +329,7 @@ def join_section_lines(lines): """ return '\n'.join(lines) + '\n' + def write_test_file(test_file_name, test_file_sections, encoding=None): """ Given a list of test file sections, write out the corresponding test file
