This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 705989d [improvement](VHashJoin) add probe timer (#8233) 705989d is described below commit 705989d23916ce115b6ed269221f7be377b74a24 Author: awakeljw <993007...@qq.com> AuthorDate: Sun Mar 13 20:54:44 2022 +0800 [improvement](VHashJoin) add probe timer (#8233) --- be/src/vec/exec/join/vhash_join_node.cpp | 217 ++++++++++++++++++------------- be/src/vec/exec/join/vhash_join_node.h | 3 + 2 files changed, 127 insertions(+), 93 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index c33bcb2..a1af769 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -166,8 +166,56 @@ struct ProcessHashTableProbe { _items_counts(join_node->_items_counts), _build_block_offsets(join_node->_build_block_offsets), _build_block_rows(join_node->_build_block_rows), - _rows_returned_counter(join_node->_rows_returned_counter) {} + _rows_returned_counter(join_node->_rows_returned_counter), + _search_hashtable_timer(join_node->_search_hashtable_timer), + _build_side_output_timer(join_node->_build_side_output_timer), + _probe_side_output_timer(join_node->_probe_side_output_timer) {} + + // output build side result column + void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length, int size) { + constexpr auto is_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN; + constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; + + if constexpr (!is_semi_anti_join) { + if (_build_blocks.size() == 1) { + for (int i = 0; i < column_length; i++) { + auto& column = *_build_blocks[0].get_by_position(i).column; + mcol[i + column_offset]->insert_indices_from(column, + _build_block_rows.data(), _build_block_rows.data() + size); + } + } else { + for (int i = 0; i < column_length; i++) { + for (int j = 0; j < size; j++) { + if constexpr (probe_all) { + if (_build_block_offsets[j] == -1) { + DCHECK(mcol[i + column_offset]->is_nullable()); + assert_cast<ColumnNullable *>(mcol[i + column_offset].get())->insert_join_null_data(); + } else { + auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; + mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); + } + } else { + auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; + mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); + } + } + } + } + } + } + + // output probe side result column + void probe_side_output_column(MutableColumns& mcol, int column_length, int size) { + for (int i = 0; i < column_length; ++i) { + auto& column = _probe_block.get_by_position(i).column; + column->replicate(&_items_counts[0], size, *mcol[i]); + } + } // Only process the join with no other join conjunt, because of no other join conjunt // the output block struct is same with mutable block. we can do more opt on it and simplify // the logic of probe @@ -198,116 +246,93 @@ struct ProcessHashTableProbe { constexpr auto is_right_semi_anti_join = JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType::value == TJoinOp::RIGHT_SEMI_JOIN; - constexpr auto is_semi_anti_join = is_right_semi_anti_join || - JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN; - constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN; - for (; _probe_index < _probe_rows;) { - if constexpr (ignore_null) { - if ((*null_map)[_probe_index]) { - _items_counts[_probe_index++] = (uint32_t)0; - continue; - } - } - int last_offset = current_offset; - auto find_result = (*null_map)[_probe_index] - ? decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index, - _arena)) {nullptr, false} - : key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena); - - if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { - if (!find_result.is_found()) { - ++current_offset; - } - } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { - if (find_result.is_found()) { - ++current_offset; + { + SCOPED_TIMER(_search_hashtable_timer); + for (; _probe_index < _probe_rows;) { + if constexpr (ignore_null) { + if ((*null_map)[_probe_index]) { + _items_counts[_probe_index++] = (uint32_t)0; + continue; + } } - } else { - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.get_row_count() == 1) { - if constexpr (need_to_set_visited) - mapped.visited = true; - - if constexpr (!is_right_semi_anti_join) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; - ++current_offset; - } - } else { - // prefetch is more useful while matching to multiple rows - if (_probe_index + 2 < _probe_rows) - key_getter.prefetch(hash_table_ctx.hash_table, _probe_index + 2, _arena); + int last_offset = current_offset; + auto find_result = (*null_map)[_probe_index] + ? decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index, + _arena)) {nullptr, false} + : key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena); + + if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) { + if (!find_result.is_found()) { + ++current_offset; + } + } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN) { + if (find_result.is_found()) { + ++current_offset; + } + } else { + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. + // We should rethink whether to use this iterator mode in the future. Now just opt the one row case + if (mapped.get_row_count() == 1) { + if constexpr (need_to_set_visited) + mapped.visited = true; - for (auto it = mapped.begin(); it.ok(); ++it) { if constexpr (!is_right_semi_anti_join) { - if (current_offset < _batch_size) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; ++current_offset; } - if constexpr (need_to_set_visited) - it->visited = true; + } else { + // prefetch is more useful while matching to multiple rows + if (_probe_index + 2 < _probe_rows) + key_getter.prefetch(hash_table_ctx.hash_table, _probe_index + 2, _arena); + + for (auto it = mapped.begin(); it.ok(); ++it) { + if constexpr (!is_right_semi_anti_join) { + if (current_offset < _batch_size) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + } + if constexpr (need_to_set_visited) + it->visited = true; + } + } + } else { + if constexpr (probe_all) { + // only full outer / left outer need insert the data of right table + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + ++current_offset; } - } - } else { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - ++current_offset; } } - } - _items_counts[_probe_index++] = (uint32_t)(current_offset - last_offset); - if (current_offset >= _batch_size) { - break; + _items_counts[_probe_index++] = (uint32_t)(current_offset - last_offset); + if (current_offset >= _batch_size) { + break; + } } } - // insert all matched build rows - if constexpr (!is_semi_anti_join) { - if (_build_blocks.size() == 1) { - for (int i = 0; i < right_col_len; i++) { - auto& column = *_build_blocks[0].get_by_position(i).column; - mcol[i + right_col_idx]->insert_indices_from(column, - _build_block_rows.data(), _build_block_rows.data() + current_offset); - } - } else { - for (int i = 0; i < right_col_len; i++) { - for (int j = 0; j < current_offset; j++) { - if constexpr (probe_all) { - if (_build_block_offsets[j] == -1) { - DCHECK(mcol[i + right_col_idx]->is_nullable()); - assert_cast<ColumnNullable *>(mcol[i + right_col_idx].get())->insert_join_null_data(); - } else { - auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; - mcol[i + right_col_idx]->insert_from(column, _build_block_rows[j]); - } - } else { - auto& column = *_build_blocks[_build_block_offsets[j]].get_by_position(i).column; - mcol[i + right_col_idx]->insert_from(column, _build_block_rows[j]); - } - } - } - } + { + SCOPED_TIMER(_build_side_output_timer); + build_side_output_column(mcol, right_col_idx, right_col_len, current_offset); } - for (int i = 0; i < right_col_idx; ++i) { - auto& column = _probe_block.get_by_position(i).column; - column->replicate(&_items_counts[0], current_offset, *mcol[i]); + { + SCOPED_TIMER(_probe_side_output_timer); + probe_side_output_column(mcol, right_col_idx, current_offset); } + output_block->swap(mutable_block.to_block()); return Status::OK(); @@ -581,6 +606,9 @@ private: std::vector<int>& _build_block_rows; ProfileCounter* _rows_returned_counter; + ProfileCounter* _search_hashtable_timer; + ProfileCounter* _build_side_output_timer; + ProfileCounter* _probe_side_output_timer; }; // now we only support inner join @@ -697,6 +725,9 @@ Status HashJoinNode::prepare(RuntimeState* state) { _probe_next_timer = ADD_TIMER(probe_phase_profile, "ProbeFindNextTime"); _probe_expr_call_timer = ADD_TIMER(probe_phase_profile, "ProbeExprCallTime"); _probe_rows_counter = ADD_COUNTER(probe_phase_profile, "ProbeRows", TUnit::UNIT); + _search_hashtable_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenSearchHashTableTime"); + _build_side_output_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenBuildSideOutputTime"); + _probe_side_output_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenProbeSideOutputTime"); _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index b50d11f..ca93aaa 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -195,6 +195,9 @@ private: RuntimeProfile::Counter* _push_compute_timer; RuntimeProfile::Counter* _build_rows_counter; RuntimeProfile::Counter* _probe_rows_counter; + RuntimeProfile::Counter* _search_hashtable_timer; + RuntimeProfile::Counter* _build_side_output_timer; + RuntimeProfile::Counter* _probe_side_output_timer; int64_t _hash_table_rows; int64_t _mem_used; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org