This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3dfebca9b16d7cf4ced40f7efac5d05ac5fe51d9 Author: Csaba Ringhofer <[email protected]> AuthorDate: Fri May 12 15:39:45 2023 +0200 IMPALA-12138: Optimize HS2 result vector allocations Before this patch the reservation sizes were based on the number of rows in the RowBatches - as batch_size has lower default than fetch_size (1024 vs 10240), one fetch is served by multiple row batches leading to reserving vectors in more than one step. This patch changes the logic to: - reserve during the first fetch the old way - reserve fetch_size in subsequent fetches This means that queries with small result set should not regress while in large ones only the first and the last fetches will be suboptimal. Also noticed that the current default fetch_size=10240 in impala-shell is not optimal for RowMaterializationTimer, probably because it is not a power of 2 and leads to overallocation. Created IMPALA-12142 for the potential default fetch_size change. Tested with select * from tpch_parquet.lineitem, and RowMaterializationTimer was decreased around 10-20%: fetch_size=10240: 3.6s -> 3.2s fetch_size=8192: 2.8s -> 2.6s Change-Id: I7b0e6a0a8fd028e3c0e4f1f4e272a50d2bfb59ba Reviewed-on: http://gerrit.cloudera.org:8080/19879 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/hs2-util.cc | 42 ++++++++++++++++++------------------- be/src/service/hs2-util.h | 4 +++- be/src/service/impala-hs2-server.cc | 11 ++++++++-- be/src/service/query-result-set.cc | 21 +++++++++++++------ be/src/service/query-result-set.h | 3 ++- 5 files changed, 50 insertions(+), 31 deletions(-) diff --git a/be/src/service/hs2-util.cc b/be/src/service/hs2-util.cc index 3d36876cb..57dde3d99 100644 --- a/be/src/service/hs2-util.cc +++ b/be/src/service/hs2-util.cc @@ -143,13 +143,12 @@ void impala::TColumnValueToHS2TColumn(const TColumnValue& col_val, // Helper to reserve space in hs2Vals->values and hs2Vals->nulls for the values that the // different implementations of ExprValuesToHS2TColumn will write. template <typename T> -void ReserveSpace(int num_rows, uint32_t output_row_idx, T* hs2Vals) { - DCHECK_GE(num_rows, 0); - int64_t num_output_rows = output_row_idx + num_rows; - int64_t num_null_bytes = BitUtil::RoundUpNumBytes(num_output_rows); +void ReserveSpace(int reserve_count, T* hs2Vals) { + DCHECK_GE(reserve_count, 0); + int64_t num_null_bytes = BitUtil::RoundUpNumBytes(reserve_count); // Round up reserve() arguments to power-of-two to avoid accidentally quadratic // behaviour from repeated small increases in size. - hs2Vals->values.reserve(BitUtil::RoundUpToPowerOfTwo(num_output_rows)); + hs2Vals->values.reserve(BitUtil::RoundUpToPowerOfTwo(reserve_count)); hs2Vals->nulls.reserve(BitUtil::RoundUpToPowerOfTwo(num_null_bytes)); } @@ -157,7 +156,6 @@ void ReserveSpace(int num_rows, uint32_t output_row_idx, T* hs2Vals) { static void BoolExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->boolVal); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { BooleanVal val = expr_eval->GetBooleanVal(it.Get()); column->boolVal.values.push_back(val.val); @@ -170,7 +168,6 @@ static void BoolExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* static void TinyIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->byteVal); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { TinyIntVal val = expr_eval->GetTinyIntVal(it.Get()); column->byteVal.values.push_back(val.val); @@ -183,7 +180,6 @@ static void TinyIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBat static void SmallIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->i16Val); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { SmallIntVal val = expr_eval->GetSmallIntVal(it.Get()); column->i16Val.values.push_back(val.val); @@ -196,7 +192,6 @@ static void SmallIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, static void IntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->i32Val); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { DCHECK_EQ(output_row_idx, column->i32Val.values.size()); IntVal val = expr_eval->GetIntVal(it.Get()); @@ -210,7 +205,6 @@ static void IntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* static void BigIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->i64Val); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { BigIntVal val = expr_eval->GetBigIntVal(it.Get()); column->i64Val.values.push_back(val.val); @@ -223,7 +217,6 @@ static void BigIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatc static void FloatExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->doubleVal); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { FloatVal val = expr_eval->GetFloatVal(it.Get()); column->doubleVal.values.push_back(val.val); @@ -236,7 +229,6 @@ static void FloatExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch static void DoubleExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->doubleVal); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { DoubleVal val = expr_eval->GetDoubleVal(it.Get()); column->doubleVal.values.push_back(val.val); @@ -249,7 +241,6 @@ static void DoubleExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatc static void TimestampExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->stringVal); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { TimestampVal val = expr_eval->GetTimestampVal(it.Get()); column->stringVal.values.emplace_back(); @@ -266,7 +257,6 @@ static void TimestampExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, static void DateExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->stringVal); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { DateVal val = expr_eval->GetDateVal(it.Get()); column->stringVal.values.emplace_back(); @@ -299,7 +289,6 @@ static void StringExprValuesToHS2TColumnHelper(ScalarExprEvaluator* expr_eval, static void StringExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->stringVal); StringExprValuesToHS2TColumnHelper( expr_eval, batch, start_idx, num_rows, output_row_idx, column->stringVal.values, column->stringVal.nulls); @@ -310,7 +299,6 @@ static void StringExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatc static void BinaryExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->binaryVal); StringExprValuesToHS2TColumnHelper( expr_eval, batch, start_idx, num_rows, output_row_idx, column->binaryVal.values, column->binaryVal.nulls); @@ -321,7 +309,6 @@ static void BinaryExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatc static void CharExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->stringVal); ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { StringVal val = expr_eval->GetStringVal(it.Get()); @@ -339,7 +326,6 @@ static void CharExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, static void DecimalExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(num_rows, output_row_idx, &column->stringVal); FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { DecimalVal val = expr_eval->GetDecimalVal(it.Get()); const ColumnType& decimalType = ColumnType::FromThrift(type); @@ -373,7 +359,6 @@ static void StructExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, uint32_t output_row_idx, bool stringify_map_keys, apache::hive::service::cli::thrift::TColumn* column) { DCHECK(type.types.size() > 1); - ReserveSpace(num_rows, output_row_idx, &column->stringVal); // The buffer used by rapidjson::Writer. We reuse it to eliminate allocations. rapidjson::StringBuffer buffer; FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { @@ -413,7 +398,6 @@ static void CollectionExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, PrimitiveType coll_impala_type = coll_thrift_type == TTypeNodeType::ARRAY ? PrimitiveType::TYPE_ARRAY : PrimitiveType::TYPE_MAP; - ReserveSpace(num_rows, output_row_idx, &column->stringVal); // The buffer used by rapidjson::Writer. We reuse it to eliminate allocations. rapidjson::StringBuffer buffer; FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { @@ -446,7 +430,7 @@ static void CollectionExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, // For V6 and above void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, - uint32_t output_row_idx, bool stringify_map_keys, + uint32_t output_row_idx, int expected_result_count, bool stringify_map_keys, apache::hive::service::cli::thrift::TColumn* column) { // Dispatch to a templated function for the loop over rows. This avoids branching on // the type for every row. @@ -454,69 +438,85 @@ void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, // to inline the expression evaluation into the loop body. switch (type.types[0].type) { case TTypeNodeType::STRUCT: + ReserveSpace(expected_result_count, &column->stringVal); StructExprValuesToHS2TColumn(expr_eval, type, batch, start_idx, num_rows, output_row_idx, stringify_map_keys, column); return; case TTypeNodeType::ARRAY: case TTypeNodeType::MAP: + ReserveSpace(expected_result_count, &column->stringVal); CollectionExprValuesToHS2TColumn(expr_eval, type, batch, start_idx, num_rows, output_row_idx, stringify_map_keys, column); return; default: break; } + switch (type.types[0].scalar_type.type) { case TPrimitiveType::NULL_TYPE: case TPrimitiveType::BOOLEAN: + ReserveSpace(expected_result_count, &column->boolVal); BoolExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::TINYINT: + ReserveSpace(expected_result_count, &column->byteVal); TinyIntExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::SMALLINT: + ReserveSpace(expected_result_count, &column->i16Val); SmallIntExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::INT: + ReserveSpace(expected_result_count, &column->i32Val); IntExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::BIGINT: + ReserveSpace(expected_result_count, &column->i64Val); BigIntExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::FLOAT: + ReserveSpace(expected_result_count, &column->doubleVal); FloatExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::DOUBLE: + ReserveSpace(expected_result_count, &column->doubleVal); DoubleExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::DATE: + ReserveSpace(expected_result_count, &column->stringVal); DateExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); break; case TPrimitiveType::TIMESTAMP: + ReserveSpace(expected_result_count, &column->stringVal); TimestampExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::STRING: case TPrimitiveType::VARCHAR: + ReserveSpace(expected_result_count, &column->stringVal); StringExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::BINARY: + ReserveSpace(expected_result_count, &column->binaryVal); BinaryExprValuesToHS2TColumn( expr_eval, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::CHAR: + ReserveSpace(expected_result_count, &column->stringVal); CharExprValuesToHS2TColumn( expr_eval, type, batch, start_idx, num_rows, output_row_idx, column); return; case TPrimitiveType::DECIMAL: { + ReserveSpace(expected_result_count, &column->stringVal); DecimalExprValuesToHS2TColumn( expr_eval, type, batch, start_idx, num_rows, output_row_idx, column); return; diff --git a/be/src/service/hs2-util.h b/be/src/service/hs2-util.h index 50618e4a6..f8096153c 100644 --- a/be/src/service/hs2-util.h +++ b/be/src/service/hs2-util.h @@ -37,10 +37,12 @@ void TColumnValueToHS2TColumn(const TColumnValue& col_val, const TColumnType& ty /// 'column' with 'type' starting at output_row_idx. The caller is responsible for /// calling RuntimeState::GetQueryStatus() to check for expression evaluation errors. If /// 'stringify_map_keys' is true, converts map keys to strings; see IMPALA-11778. +/// 'expected_result_count' is used for reserving space in the result vectors. /// For V6-> void ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, - bool stringify_map_keys, apache::hive::service::cli::thrift::TColumn* column); + int expected_result_count, bool stringify_map_keys, + apache::hive::service::cli::thrift::TColumn* column); /// For V1->V5 void TColumnValueToHS2TColumnValue(const TColumnValue& col_val, const TColumnType& type, diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index e2c01ac2e..2eea07e1d 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -222,9 +222,16 @@ Status ImpalaServer::FetchInternal(TUniqueId query_id, SessionState* session, bool is_child_query = query_handle->parent_query_id() != TUniqueId(); TProtocolVersion::type version = is_child_query ? TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version; + + // In the first fetch, expect 0 results to avoid reserving unnecessarily large result + // vectors for small queries. If there are more fetches (so there are more rows than + // num_rows_fetched), then expect subsequent fetches to be fully filled. + int expected_result_count = query_handle->num_rows_fetched() == 0 ? 0 + : fetch_size; + scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateHS2ResultSet( version, *(query_handle->result_metadata()), &(fetch_results->results), - query_handle->query_options().stringify_map_keys)); + query_handle->query_options().stringify_map_keys, expected_result_count)); RETURN_IF_ERROR( query_handle->FetchRows(fetch_size, result_set.get(), block_on_wait_time_us)); *num_results = result_set->size(); @@ -552,7 +559,7 @@ Status ImpalaServer::SetupResultsCacheing(const QueryHandle& query_handle, const TResultSetMetadata* result_set_md = query_handle->result_metadata(); QueryResultSet* result_set = QueryResultSet::CreateHS2ResultSet(session->hs2_version, *result_set_md, nullptr, - query_handle->query_options().stringify_map_keys); + query_handle->query_options().stringify_map_keys, 0); RETURN_IF_ERROR(query_handle->SetResultCache(result_set, cache_num_rows)); } return Status::OK(); diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc index dd7a2b2f0..e4a828709 100644 --- a/be/src/service/query-result-set.cc +++ b/be/src/service/query-result-set.cc @@ -94,7 +94,7 @@ class AsciiQueryResultSet : public QueryResultSet { class HS2ColumnarResultSet : public QueryResultSet { public: HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset, - bool stringify_map_keys); + bool stringify_map_keys, int expected_result_count); virtual ~HS2ColumnarResultSet() {} @@ -130,6 +130,10 @@ class HS2ColumnarResultSet : public QueryResultSet { // If true, converts map keys to strings; see IMPALA-11778. const bool stringify_map_keys_; + // Expected number of result rows that will be returned with this + // fetch request. Used to reserve results vector memory. + const int expected_result_count_; + void InitColumns(); }; @@ -174,11 +178,12 @@ QueryResultSet* QueryResultSet::CreateAsciiQueryResultSet( QueryResultSet* QueryResultSet::CreateHS2ResultSet( TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset, - bool stringify_map_keys) { + bool stringify_map_keys, int expected_result_count) { if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) { return new HS2RowOrientedResultSet(metadata, rowset); } else { - return new HS2ColumnarResultSet(metadata, rowset, stringify_map_keys); + return new HS2ColumnarResultSet( + metadata, rowset, stringify_map_keys, expected_result_count); } } @@ -342,9 +347,11 @@ uint32_t TColumnByteSize(const ThriftTColumn& col, uint32_t start_idx, uint32_t // Result set container for Hive protocol versions >= V6, where results are returned in // column-orientation. HS2ColumnarResultSet::HS2ColumnarResultSet( - const TResultSetMetadata& metadata, TRowSet* rowset, bool stringify_map_keys) + const TResultSetMetadata& metadata, TRowSet* rowset, bool stringify_map_keys, + int expected_result_count) : metadata_(metadata), result_set_(rowset), num_rows_(0), - stringify_map_keys_(stringify_map_keys) { + stringify_map_keys_(stringify_map_keys), + expected_result_count_(expected_result_count) { if (rowset == NULL) { owned_result_set_.reset(new TRowSet()); result_set_ = owned_result_set_.get(); @@ -355,13 +362,15 @@ HS2ColumnarResultSet::HS2ColumnarResultSet( Status HS2ColumnarResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch, int start_idx, int num_rows) { DCHECK_GE(batch->num_rows(), start_idx + num_rows); + int expected_result_count = + std::max((int64_t) expected_result_count_, num_rows + num_rows_); int num_col = expr_evals.size(); DCHECK_EQ(num_col, metadata_.columns.size()); for (int i = 0; i < num_col; ++i) { const TColumnType& type = metadata_.columns[i].columnType; ScalarExprEvaluator* expr_eval = expr_evals[i]; ExprValuesToHS2TColumn(expr_eval, type, batch, start_idx, num_rows, num_rows_, - stringify_map_keys_, &(result_set_->columns[i])); + expected_result_count, stringify_map_keys_, &(result_set_->columns[i])); } num_rows_ += num_rows; return Status::OK(); diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h index 60621ffa0..7c2fa10a3 100644 --- a/be/src/service/query-result-set.h +++ b/be/src/service/query-result-set.h @@ -79,7 +79,8 @@ class QueryResultSet { static QueryResultSet* CreateHS2ResultSet( apache::hive::service::cli::thrift::TProtocolVersion::type version, const TResultSetMetadata& metadata, - apache::hive::service::cli::thrift::TRowSet* rowset, bool stringify_map_keys); + apache::hive::service::cli::thrift::TRowSet* rowset, bool stringify_map_keys, + int expected_result_count); protected: /// Wrapper to call ComplexValueWriter::CollectionValueToJSON() or
