github-actions[bot] commented on code in PR #33945: URL: https://github.com/apache/doris/pull/33945#discussion_r1574287414
########## be/src/pipeline/pipeline_fragment_context.cpp: ########## @@ -231,36 +236,33 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { return pipeline; } -Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request, size_t idx) { +Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request) { Review Comment: warning: function 'prepare' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/pipeline_fragment_context.cpp:238:** 105 lines including whitespace and comments (threshold 80) ```cpp ^ ``` </details> ########## be/src/pipeline/pipeline_fragment_context.cpp: ########## @@ -481,260 +605,838 @@ } // TODO: use virtual function to do abstruct -Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur_pipe) { - auto node_type = node->type(); - switch (node_type) { - // for source - case TPlanNodeType::OLAP_SCAN_NODE: - case TPlanNodeType::JDBC_SCAN_NODE: - case TPlanNodeType::ODBC_SCAN_NODE: - case TPlanNodeType::FILE_SCAN_NODE: - case TPlanNodeType::META_SCAN_NODE: - case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: - case TPlanNodeType::ES_HTTP_SCAN_NODE: - case TPlanNodeType::ES_SCAN_NODE: { - OperatorBuilderPtr operator_t = std::make_shared<ScanOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); - break; - } - case TPlanNodeType::MYSQL_SCAN_NODE: { -#ifdef DORIS_WITH_MYSQL - OperatorBuilderPtr operator_t = - std::make_shared<MysqlScanOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); - break; -#else +Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, + const doris::TPipelineFragmentParams& request, + const DescriptorTbl& descs, OperatorXPtr* root, + PipelinePtr cur_pipe) { + if (request.fragment.plan.nodes.empty()) { + throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!"); + } + + int node_idx = 0; + + cur_pipe->_name.append(std::to_string(cur_pipe->id())); + + RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes, request, descs, nullptr, + &node_idx, root, cur_pipe, 0)); + + if (node_idx + 1 != request.fragment.plan.nodes.size()) { + // TODO: print thrift msg for diagnostic purposes. return Status::InternalError( - "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON"); -#endif + "Plan tree only partially reconstructed. Not all thrift nodes were used."); } - case TPlanNodeType::SCHEMA_SCAN_NODE: { - OperatorBuilderPtr operator_t = - std::make_shared<SchemaScanOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); - break; + return Status::OK(); +} + +Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, + const std::vector<TPlanNode>& tnodes, + const doris::TPipelineFragmentParams& request, + const DescriptorTbl& descs, OperatorXPtr parent, + int* node_idx, OperatorXPtr* root, + PipelinePtr& cur_pipe, int child_idx) { + // propagate error case + if (*node_idx >= tnodes.size()) { + // TODO: print thrift msg + return Status::InternalError( + "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}", + *node_idx, tnodes.size()); } - case TPlanNodeType::EXCHANGE_NODE: { - OperatorBuilderPtr operator_t = - std::make_shared<ExchangeSourceOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); + const TPlanNode& tnode = tnodes[*node_idx]; + + int num_children = tnodes[*node_idx].num_children; + OperatorXPtr op = nullptr; + RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, + parent == nullptr ? -1 : parent->node_id(), child_idx)); + + // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); + if (parent != nullptr) { + // add to parent's child(s) + RETURN_IF_ERROR(parent->set_child(op)); + } else { + *root = op; + } + + cur_pipe->_name.push_back('-'); + cur_pipe->_name.append(std::to_string(op->id())); + cur_pipe->_name.append(op->get_name()); + + // rely on that tnodes is preorder of the plan + for (int i = 0; i < num_children; i++) { + ++*node_idx; + RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, + cur_pipe, i)); + + // we are expecting a child, but have used all nodes + // this means we have been given a bad tree and must fail + if (*node_idx >= tnodes.size()) { + // TODO: print thrift msg + return Status::InternalError( + "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}", + *node_idx, tnodes.size()); + } + } + + RETURN_IF_ERROR(op->init(tnode, _runtime_state.get())); + + return Status::OK(); +} + +void PipelineFragmentContext::_inherit_pipeline_properties( + const DataDistribution& data_distribution, PipelinePtr pipe_with_source, + PipelinePtr pipe_with_sink) { + pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks()); + pipe_with_source->set_num_tasks(_num_instances); + pipe_with_source->set_data_distribution(data_distribution); +} + +Status PipelineFragmentContext::_add_local_exchange_impl( Review Comment: warning: function '_add_local_exchange_impl' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/pipeline_fragment_context.cpp:691:** 115 lines including whitespace and comments (threshold 80) ```cpp ^ ``` </details> ########## be/src/pipeline/pipeline_fragment_context.cpp: ########## @@ -481,260 +605,838 @@ } // TODO: use virtual function to do abstruct -Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur_pipe) { - auto node_type = node->type(); - switch (node_type) { - // for source - case TPlanNodeType::OLAP_SCAN_NODE: - case TPlanNodeType::JDBC_SCAN_NODE: - case TPlanNodeType::ODBC_SCAN_NODE: - case TPlanNodeType::FILE_SCAN_NODE: - case TPlanNodeType::META_SCAN_NODE: - case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: - case TPlanNodeType::ES_HTTP_SCAN_NODE: - case TPlanNodeType::ES_SCAN_NODE: { - OperatorBuilderPtr operator_t = std::make_shared<ScanOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); - break; - } - case TPlanNodeType::MYSQL_SCAN_NODE: { -#ifdef DORIS_WITH_MYSQL - OperatorBuilderPtr operator_t = - std::make_shared<MysqlScanOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); - break; -#else +Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, + const doris::TPipelineFragmentParams& request, + const DescriptorTbl& descs, OperatorXPtr* root, + PipelinePtr cur_pipe) { + if (request.fragment.plan.nodes.empty()) { + throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!"); + } + + int node_idx = 0; + + cur_pipe->_name.append(std::to_string(cur_pipe->id())); + + RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes, request, descs, nullptr, + &node_idx, root, cur_pipe, 0)); + + if (node_idx + 1 != request.fragment.plan.nodes.size()) { + // TODO: print thrift msg for diagnostic purposes. return Status::InternalError( - "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON"); -#endif + "Plan tree only partially reconstructed. Not all thrift nodes were used."); } - case TPlanNodeType::SCHEMA_SCAN_NODE: { - OperatorBuilderPtr operator_t = - std::make_shared<SchemaScanOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); - break; + return Status::OK(); +} + +Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, + const std::vector<TPlanNode>& tnodes, + const doris::TPipelineFragmentParams& request, + const DescriptorTbl& descs, OperatorXPtr parent, + int* node_idx, OperatorXPtr* root, + PipelinePtr& cur_pipe, int child_idx) { + // propagate error case + if (*node_idx >= tnodes.size()) { + // TODO: print thrift msg + return Status::InternalError( + "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}", + *node_idx, tnodes.size()); } - case TPlanNodeType::EXCHANGE_NODE: { - OperatorBuilderPtr operator_t = - std::make_shared<ExchangeSourceOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); + const TPlanNode& tnode = tnodes[*node_idx]; + + int num_children = tnodes[*node_idx].num_children; + OperatorXPtr op = nullptr; + RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, + parent == nullptr ? -1 : parent->node_id(), child_idx)); + + // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); + if (parent != nullptr) { + // add to parent's child(s) + RETURN_IF_ERROR(parent->set_child(op)); + } else { + *root = op; + } + + cur_pipe->_name.push_back('-'); + cur_pipe->_name.append(std::to_string(op->id())); + cur_pipe->_name.append(op->get_name()); + + // rely on that tnodes is preorder of the plan + for (int i = 0; i < num_children; i++) { + ++*node_idx; + RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, + cur_pipe, i)); + + // we are expecting a child, but have used all nodes + // this means we have been given a bad tree and must fail + if (*node_idx >= tnodes.size()) { + // TODO: print thrift msg + return Status::InternalError( + "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}", + *node_idx, tnodes.size()); + } + } + + RETURN_IF_ERROR(op->init(tnode, _runtime_state.get())); + + return Status::OK(); +} + +void PipelineFragmentContext::_inherit_pipeline_properties( + const DataDistribution& data_distribution, PipelinePtr pipe_with_source, + PipelinePtr pipe_with_sink) { + pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks()); + pipe_with_source->set_num_tasks(_num_instances); + pipe_with_source->set_data_distribution(data_distribution); +} + +Status PipelineFragmentContext::_add_local_exchange_impl( + int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip, + DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, + const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, + const bool ignore_data_hash_distribution) { + auto& operator_xs = cur_pipe->operator_xs(); + const auto downstream_pipeline_id = cur_pipe->id(); + auto local_exchange_id = next_operator_id(); + // 1. Create a new pipeline with local exchange sink. + DataSinkOperatorXPtr sink; + auto sink_id = next_sink_operator_id(); + const bool is_shuffled_hash_join = operator_xs.size() > idx + ? operator_xs[idx]->is_shuffled_hash_join() + : cur_pipe->sink_x()->is_shuffled_hash_join(); + sink.reset(new LocalExchangeSinkOperatorX( + sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances, + data_distribution.partition_exprs, bucket_seq_to_instance_idx)); + RETURN_IF_ERROR(new_pip->set_sink(sink)); + RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, + is_shuffled_hash_join, shuffle_idx_to_instance_idx)); + + // 2. Create and initialize LocalExchangeSharedState. + auto shared_state = LocalExchangeSharedState::create_shared(_num_instances); + switch (data_distribution.distribution_type) { + case ExchangeType::HASH_SHUFFLE: + shared_state->exchanger = ShuffleExchanger::create_unique( + std::max(cur_pipe->num_tasks(), _num_instances), + is_shuffled_hash_join ? _total_instances : _num_instances); break; + case ExchangeType::BUCKET_HASH_SHUFFLE: + shared_state->exchanger = BucketShuffleExchanger::create_unique( + std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets, + ignore_data_hash_distribution); + break; + case ExchangeType::PASSTHROUGH: + shared_state->exchanger = + PassthroughExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + break; + case ExchangeType::BROADCAST: + shared_state->exchanger = + BroadcastExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + break; + case ExchangeType::PASS_TO_ONE: + shared_state->exchanger = + BroadcastExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + break; + case ExchangeType::ADAPTIVE_PASSTHROUGH: + shared_state->exchanger = + AdaptivePassthroughExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + break; + default: + return Status::InternalError("Unsupported local exchange type : " + + std::to_string((int)data_distribution.distribution_type)); + } + auto sink_dep = std::make_shared<Dependency>(sink_id, local_exchange_id, + "LOCAL_EXCHANGE_SINK_DEPENDENCY", true, + _runtime_state->get_query_ctx()); + sink_dep->set_shared_state(shared_state.get()); + shared_state->sink_deps.push_back(sink_dep); + _op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}}); + + // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to + // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink]. + + // 3.1 Initialize new pipeline's operator list. + std::copy(operator_xs.begin(), operator_xs.begin() + idx, + std::inserter(new_pip->operator_xs(), new_pip->operator_xs().end())); + + // 3.2 Erase unused operators in previous pipeline. + operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx); + + // 4. Initialize LocalExchangeSource and insert it into this pipeline. + OperatorXPtr source_op; + source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); + RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back())); + RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type)); + if (!operator_xs.empty()) { + RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); + } + operator_xs.insert(operator_xs.begin(), source_op); + + shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id(), + _query_ctx.get()); + + // 5. Set children for two pipelines separately. + std::vector<std::shared_ptr<Pipeline>> new_children; + std::vector<PipelineId> edges_with_source; + for (auto child : cur_pipe->children()) { + bool found = false; + for (auto op : new_pip->operator_xs()) { + if (child->sink_x()->node_id() == op->node_id()) { + new_pip->set_children(child); + found = true; + }; + } + if (!found) { + new_children.push_back(child); + edges_with_source.push_back(child->id()); + } } - case TPlanNodeType::EMPTY_SET_NODE: { - OperatorBuilderPtr operator_t = - std::make_shared<EmptySetSourceOperatorBuilder>(node->id(), node); - RETURN_IF_ERROR(cur_pipe->add_operator(operator_t)); + new_children.push_back(new_pip); + edges_with_source.push_back(new_pip->id()); + + // 6. Set DAG for new pipelines. + if (!new_pip->children().empty()) { + std::vector<PipelineId> edges_with_sink; + for (auto child : new_pip->children()) { + edges_with_sink.push_back(child->id()); + } + _dag.insert({new_pip->id(), edges_with_sink}); + } + cur_pipe->set_children(new_children); + _dag[downstream_pipeline_id] = edges_with_source; + RETURN_IF_ERROR(new_pip->sink_x()->set_child(new_pip->operator_xs().back())); + RETURN_IF_ERROR(cur_pipe->sink_x()->set_child(cur_pipe->operator_xs().back())); + + // 7. Inherit properties from current pipeline. + _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip); + return Status::OK(); +} + +Status PipelineFragmentContext::_add_local_exchange( + int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, + DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, + const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, + const bool ignore_data_distribution) { + DCHECK(_enable_local_shuffle()); + if (_num_instances <= 1) { + return Status::OK(); + } + + if (!cur_pipe->need_to_local_exchange(data_distribution)) { + return Status::OK(); + } + *do_local_exchange = true; + + auto& operator_xs = cur_pipe->operator_xs(); + auto total_op_num = operator_xs.size(); + auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); + RETURN_IF_ERROR(_add_local_exchange_impl( + idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution)); + + CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + new_pip->operator_xs().size()) + << "total_op_num: " << total_op_num + << " cur_pipe->operator_xs().size(): " << cur_pipe->operator_xs().size() + << " new_pip->operator_xs().size(): " << new_pip->operator_xs().size(); + + // Add passthrough local exchanger if necessary + if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 && + Pipeline::is_hash_exchange(data_distribution.distribution_type)) { + RETURN_IF_ERROR(_add_local_exchange_impl( + new_pip->operator_xs().size(), pool, new_pip, add_pipeline(new_pip, pip_idx + 2), + DataDistribution(ExchangeType::PASSTHROUGH), do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, ignore_data_distribution)); + } + return Status::OK(); +} + +Status PipelineFragmentContext::_plan_local_exchange( + int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx) { + for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) { + _pipelines[pip_idx]->init_data_distribution(); + // Set property if child pipeline is not join operator's child. + if (!_pipelines[pip_idx]->children().empty()) { + for (auto& child : _pipelines[pip_idx]->children()) { + if (child->sink_x()->node_id() == + _pipelines[pip_idx]->operator_xs().front()->node_id()) { + RETURN_IF_ERROR(_pipelines[pip_idx]->operator_xs().front()->set_child( + child->operator_xs().back())); + _pipelines[pip_idx]->set_data_distribution(child->data_distribution()); + } + } + } + + RETURN_IF_ERROR(_plan_local_exchange( + _pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution() + ? _num_instances + : num_buckets, + pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx, + shuffle_idx_to_instance_idx, + _pipelines[pip_idx]->operator_xs().front()->ignore_data_hash_distribution())); + } + return Status::OK(); +} + +Status PipelineFragmentContext::_plan_local_exchange( + int num_buckets, int pip_idx, PipelinePtr pip, + const std::map<int, int>& bucket_seq_to_instance_idx, + const std::map<int, int>& shuffle_idx_to_instance_idx, + const bool ignore_data_hash_distribution) { + int idx = 1; + bool do_local_exchange = false; + do { + auto& ops = pip->operator_xs(); + do_local_exchange = false; + // Plan local exchange for each operator. + for (; idx < ops.size();) { + if (ops[idx]->required_data_distribution().need_local_exchange()) { + RETURN_IF_ERROR(_add_local_exchange( + pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, + ops[idx]->required_data_distribution(), &do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, + ignore_data_hash_distribution)); + } + if (do_local_exchange) { + // If local exchange is needed for current operator, we will split this pipeline to + // two pipelines by local exchange sink/source. And then we need to process remaining + // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1 + // is current operator was already processed) and continue to plan local exchange. + idx = 2; + break; + } + idx++; + } + } while (do_local_exchange); + if (pip->sink_x()->required_data_distribution().need_local_exchange()) { + RETURN_IF_ERROR(_add_local_exchange( + pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip, + pip->sink_x()->required_data_distribution(), &do_local_exchange, num_buckets, + bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, + ignore_data_hash_distribution)); + } + return Status::OK(); +} + +Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, Review Comment: warning: function '_create_data_sink' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/pipeline_fragment_context.cpp:920:** 139 lines including whitespace and comments (threshold 80) ```cpp ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org