xiaokang commented on code in PR #18654: URL: https://github.com/apache/doris/pull/18654#discussion_r1191798635
########## be/src/exec/tablet_info.h: ########## @@ -263,11 +263,17 @@ struct NodeInfo { class DorisNodesInfo { public: + DorisNodesInfo() = default; DorisNodesInfo(const TPaloNodesInfo& t_nodes) { for (auto& node : t_nodes.nodes) { _nodes.emplace(node.id, node); } } + void setNodes(const TPaloNodesInfo& t_nodes) { Review Comment: set should do clear first. ########## be/src/exec/rowid_fetcher.cpp: ########## @@ -133,20 +180,51 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr& row_ids, _stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback); } counter.wait(); - RETURN_IF_ERROR(MergeRPCResults(resps, cntls, &mblock)); - // final sort by row_ids sequence, since row_ids is already sorted - vectorized::Block tmp = mblock.to_block(); - std::unordered_map<std::string, uint32_t> row_order; - vectorized::ColumnPtr row_id_column = tmp.get_columns().back(); - for (size_t x = 0; x < row_id_column->size(); ++x) { + + // Merge + std::vector<RowId> rows_id; + rows_id.reserve(rows_id.size()); + RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block, &rows_id)); + + // Final sort by row_ids sequence, since row_ids is already sorted if need + std::map<GlobalRowLoacation, size_t> positions; + for (size_t i = 0; i < rows_id.size(); ++i) { + RowsetId rowset_id; + rowset_id.init(rows_id[i].rowset_id()); + GlobalRowLoacation grl(rows_id[i].tablet_id(), rowset_id, rows_id[i].segment_id(), + rows_id[i].ordinal_id()); + positions[grl] = i; + }; + vectorized::IColumn::Permutation permutation; + permutation.reserve(column_row_ids->size()); + for (size_t i = 0; i < column_row_ids->size(); ++i) { auto location = - reinterpret_cast<const GlobalRowLoacation*>(row_id_column->get_data_at(x).data); - row_order[format_rowid(*location)] = x; + reinterpret_cast<const GlobalRowLoacation*>(column_row_ids->get_data_at(i).data); + permutation.push_back(positions[*location]); + } + size_t num_rows = res_block->rows(); + for (size_t i = 0; i < res_block->columns(); ++i) { + res_block->get_by_position(i).column = + res_block->get_by_position(i).column->permute(permutation, num_rows); } - for (size_t x = 0; x < row_ids->size(); ++x) { - auto location = reinterpret_cast<const GlobalRowLoacation*>(row_ids->get_data_at(x).data); - res_block->add_row(&tmp, row_order[format_rowid(*location)]); + // shrink for char type + std::vector<size_t> char_type_idx; + for (size_t i = 0; i < _fetch_option.desc->slots().size(); i++) { + auto column_desc = _fetch_option.desc->slots()[i]; + auto type_desc = column_desc->type(); + do { + if (type_desc.type == TYPE_CHAR) { + char_type_idx.emplace_back(i); + break; + } else if (type_desc.type != TYPE_ARRAY) { + break; + } + // for Array<Char> or Array<Array<Char>> + type_desc = type_desc.children[0]; + } while (true); } + res_block->shrink_char_type_column_suffix_zero(char_type_idx); Review Comment: What's the purpose to shrink suffix zero? If there are more than one char column, only one is shrinked here. ########## be/src/exec/rowid_fetcher.cpp: ########## @@ -65,20 +73,18 @@ Status RowIDFetcher::init(DorisNodesInfo* nodes_info) { return Status::OK(); } -static std::string format_rowid(const GlobalRowLoacation& location) { - return fmt::format("{} {} {} {}", location.tablet_id, - location.row_location.rowset_id.to_string(), - location.row_location.segment_id, location.row_location.row_id); -} - -PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnString& row_ids) { +PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnString& row_ids) const { PMultiGetRequest mget_req; - _tuple_desc->to_protobuf(mget_req.mutable_desc()); - for (auto slot : _tuple_desc->slots()) { + _fetch_option.desc->to_protobuf(mget_req.mutable_desc()); + for (SlotDescriptor* slot : _fetch_option.desc->slots()) { + // ignore rowid + if (slot->col_name() == BeConsts::ROWID_COL) { + continue; + } slot->to_protobuf(mget_req.add_slots()); } for (size_t i = 0; i < row_ids.size(); ++i) { - PMultiGetRequest::RowId row_id; + RowId row_id; StringRef row_id_rep = row_ids.get_data_at(i); auto location = reinterpret_cast<const GlobalRowLoacation*>(row_id_rep.data); Review Comment: It's unsafe for different be with different CPU architectures. ########## gensrc/thrift/DataSinks.thrift: ########## @@ -122,9 +122,19 @@ struct TDataStreamSink { 3: optional bool ignore_not_found } + +struct TFetchOption { + 1: optional bool use_two_phase_fetch; + // Nodes in this cluster, used for second phase fetch + 2: optional Descriptors.TPaloNodesInfo nodes_info; + // Whether fetch row store + 3: optional bool fetch_row_store; Review Comment: should using row_store dujuged by fe or be? ########## be/src/olap/tablet.cpp: ########## @@ -2481,7 +2481,7 @@ Status Tablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segi Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, RowsetSharedPtr input_rowset, const TupleDescriptor* desc, - OlapReaderStatistics& stats, vectorized::Block* block, + OlapReaderStatistics& stats, std::string& values, Review Comment: What's the purpose for using string instead of block? ########## fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java: ########## @@ -474,17 +482,35 @@ private void injectRowIdColumnSlot() { node = node.getChildren().get(0); } - if (!(node instanceof OlapScanNode) || !(parent instanceof SortNode)) { - continue; + // case1 + if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) { + SortNode sortNode = (SortNode) parent; + OlapScanNode scanNode = (OlapScanNode) node; + SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc()); + injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor()); Review Comment: It's better to encapsulate injectRowIdColumnSlot and sortNode.getResolvedTupleExprs().add(extSlot) to a method os SortNode. ########## be/src/service/internal_service.cpp: ########## @@ -1422,11 +1439,26 @@ static Status read_by_rowids( continue; } segment_v2::SegmentSharedPtr segment = *it; - for (int x = 0; x < desc.slots().size() - 1; ++x) { Review Comment: Is -1 missed in new code. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -195,6 +197,45 @@ public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator this.statsErrorEstimator = statsErrorEstimator; } + // We use two phase read to optimize sql like: select * from tbl [where xxx = ???] [order by column1] [limit n] + // in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes + // in the second phase, we have n rows, we do a fetch rpc to get all rowids data for the n rows + // and reconconstruct the final block + private void setResultSinkFetchOptionIfNeed() { + boolean needFetch = false; + // Only single olap table should be fetched + OlapTable fetchOlapTable = null; + for (PlanFragment fragment : context.getPlanFragments()) { Review Comment: There is no gurrantee for the simple query shape. ########## be/src/service/internal_service.cpp: ########## @@ -1384,15 +1386,30 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( } } -static Status read_by_rowids( - std::pair<size_t, size_t> row_range_idx, const TupleDescriptor& desc, - const google::protobuf::RepeatedPtrField<PMultiGetRequest_RowId>& rowids, - vectorized::Block* sub_block) { - //read from row_range.first to row_range.second - for (size_t i = row_range_idx.first; i < row_range_idx.second; ++i) { +Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, + PMultiGetResponse* response) { + OlapReaderStatistics stats; + vectorized::Block result_block; + + // init desc + TupleDescriptor desc(request.desc()); + std::vector<SlotDescriptor> slots; + slots.reserve(request.slots().size()); + for (const auto& pslot : request.slots()) { + slots.push_back(SlotDescriptor(pslot)); + desc.add_slot(&slots.back()); Review Comment: Is there no slot in tuple desc? ########## fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java: ########## @@ -63,4 +86,18 @@ public PlanNodeId getExchNodeId() { public DataPartition getOutputPartition() { return null; } + + /** + * Set the parameters used to fetch data by rowid column + * after init(). + */ + private TPaloNodesInfo createNodesInfo() { + TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) { Review Comment: If there is be offline, the nodesInfo is static and new be can not be noticed to sink node. ########## be/src/service/internal_service.cpp: ########## @@ -1384,15 +1386,30 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( } } -static Status read_by_rowids( - std::pair<size_t, size_t> row_range_idx, const TupleDescriptor& desc, - const google::protobuf::RepeatedPtrField<PMultiGetRequest_RowId>& rowids, - vectorized::Block* sub_block) { - //read from row_range.first to row_range.second - for (size_t i = row_range_idx.first; i < row_range_idx.second; ++i) { +Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, + PMultiGetResponse* response) { + OlapReaderStatistics stats; + vectorized::Block result_block; + + // init desc + TupleDescriptor desc(request.desc()); + std::vector<SlotDescriptor> slots; + slots.reserve(request.slots().size()); + for (const auto& pslot : request.slots()) { + slots.push_back(SlotDescriptor(pslot)); + desc.add_slot(&slots.back()); + } + + // read row by row + for (size_t i = 0; i < request.rowids_size(); ++i) { + const auto& row_id = request.rowids(i); Review Comment: In fact, row id is row location. So the names row_loc, row_locs are better. ########## be/src/exec/rowid_fetcher.cpp: ########## @@ -87,43 +93,84 @@ PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnStrin row_id.set_ordinal_id(location->row_location.row_id); *mget_req.add_rowids() = std::move(row_id); } - mget_req.set_be_exec_version(_st->be_exec_version()); + PUniqueId& query_id = *mget_req.mutable_query_id(); + query_id.set_hi(_fetch_option.runtime_state->query_id().hi); + query_id.set_lo(_fetch_option.runtime_state->query_id().lo); + mget_req.set_be_exec_version(_fetch_option.runtime_state->be_exec_version()); + mget_req.set_fetch_row_store(_fetch_option.t_fetch_opt.fetch_row_store); return mget_req; } static void fetch_callback(bthread::CountdownEvent* counter) { Defer __defer([&] { counter->signal(); }); } -static Status MergeRPCResults(const std::vector<PMultiGetResponse>& rsps, - const std::vector<brpc::Controller>& cntls, - vectorized::MutableBlock* output_block) { +Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request, + const std::vector<PMultiGetResponse>& rsps, + const std::vector<brpc::Controller>& cntls, + vectorized::Block* output_block, + std::vector<RowId>* rows_id) const { + output_block->clear(); for (const auto& cntl : cntls) { if (cntl.Failed()) { LOG(WARNING) << "Failed to fetch meet rpc error:" << cntl.ErrorText() << ", host:" << cntl.remote_side(); return Status::InternalError(cntl.ErrorText()); } } - for (const auto& resp : rsps) { + + auto merge_function = [&](const PMultiGetResponse& resp) { Status st(resp.status()); if (!st.ok()) { LOG(WARNING) << "Failed to fetch " << st.to_string(); return st; } + for (const RowId& row_id : resp.row_ids()) { + rows_id->push_back(row_id); + } + // Merge binary rows + if (request.fetch_row_store()) { + if (output_block->is_empty_column()) { + *output_block = vectorized::Block(_fetch_option.desc->slots(), 1); + } + for (int i = 0; i < resp.binary_row_data_size(); ++i) { + vectorized::JsonbSerializeUtil::jsonb_to_block( + *_fetch_option.desc, resp.binary_row_data(i).data(), + resp.binary_row_data(i).size(), *output_block); + } + return Status::OK(); + } + // Merge partial blocks vectorized::Block partial_block(resp.block()); Review Comment: DCHECK resp.row_ids().size() == partial_block.size() ########## be/src/vec/sink/vresult_sink.cpp: ########## @@ -100,9 +111,31 @@ Status VResultSink::open(RuntimeState* state) { return VExpr::open(_output_vexpr_ctxs, state); } +Status VResultSink::second_phase_fetch_data(RuntimeState* state, Block* final_block) { + auto row_id_col = final_block->get_by_position(final_block->columns() - 1); Review Comment: add DCHECK to ensure it's row id column actually. ########## fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java: ########## @@ -43,13 +51,28 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { strBuilder.append("V"); } strBuilder.append("RESULT SINK\n"); + if (fetchOption != null) { + strBuilder.append(prefix).append(" ").append("OPT TWO PHASE\n"); + if (fetchOption.isFetchRowStore()) { + strBuilder.append(prefix).append(" ").append("FETCH ROW STORE\n"); + } + } return strBuilder.toString(); } + public void setFetchOption(TFetchOption fetchOption) { + this.fetchOption = fetchOption; + } + @Override protected TDataSink toThrift() { TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK); TResultSink tResultSink = new TResultSink(); + if (fetchOption != null) { + fetchOption.setUseTwoPhaseFetch(true); Review Comment: It's not easy to understand to set fields of fetchOption in two locations, Planner and SinkNode. ########## be/src/exec/rowid_fetcher.cpp: ########## @@ -48,11 +52,15 @@ #include "vec/common/assert_cast.h" #include "vec/common/string_ref.h" #include "vec/core/block.h" // Block +#include "vec/data_types/data_type_factory.hpp" +#include "vec/jsonb/serialize.h" namespace doris { -Status RowIDFetcher::init(DorisNodesInfo* nodes_info) { - for (auto [node_id, node_info] : nodes_info->nodes_info()) { +Status RowIDFetcher::init() { + DorisNodesInfo nodes_info; + nodes_info.setNodes(_fetch_option.t_fetch_opt.nodes_info); + for (auto [node_id, node_info] : nodes_info.nodes_info()) { Review Comment: Some be may be not necessary to connect if there is no rows on it. ########## be/src/service/internal_service.cpp: ########## @@ -1384,15 +1386,30 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset( } } -static Status read_by_rowids( - std::pair<size_t, size_t> row_range_idx, const TupleDescriptor& desc, - const google::protobuf::RepeatedPtrField<PMultiGetRequest_RowId>& rowids, - vectorized::Block* sub_block) { - //read from row_range.first to row_range.second - for (size_t i = row_range_idx.first; i < row_range_idx.second; ++i) { +Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request, + PMultiGetResponse* response) { + OlapReaderStatistics stats; + vectorized::Block result_block; + + // init desc + TupleDescriptor desc(request.desc()); + std::vector<SlotDescriptor> slots; + slots.reserve(request.slots().size()); + for (const auto& pslot : request.slots()) { + slots.push_back(SlotDescriptor(pslot)); + desc.add_slot(&slots.back()); + } + + // read row by row + for (size_t i = 0; i < request.rowids_size(); ++i) { + const auto& row_id = request.rowids(i); MonotonicStopWatch watch; watch.start(); - auto row_id = rowids[i]; + Defer _defer([&]() { + LOG_EVERY_N(INFO, 100) + << "multiget_data single_row, cost(us):" << watch.elapsed_time() / 1000; + *response->add_row_ids() = row_id; + }); TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet( row_id.tablet_id(), true /*include deleted*/); RowsetId rowset_id; Review Comment: move it after if(!tablet) continue; ########## gensrc/proto/internal_service.proto: ########## @@ -593,23 +593,35 @@ message PFetchTableSchemaResult { repeated PTypeDesc column_types = 4; } +message RowId { Review Comment: RowLocation is better ########## be/src/exec/rowid_fetcher.cpp: ########## @@ -87,43 +93,84 @@ PMultiGetRequest RowIDFetcher::_init_fetch_request(const vectorized::ColumnStrin row_id.set_ordinal_id(location->row_location.row_id); *mget_req.add_rowids() = std::move(row_id); } - mget_req.set_be_exec_version(_st->be_exec_version()); + PUniqueId& query_id = *mget_req.mutable_query_id(); + query_id.set_hi(_fetch_option.runtime_state->query_id().hi); + query_id.set_lo(_fetch_option.runtime_state->query_id().lo); + mget_req.set_be_exec_version(_fetch_option.runtime_state->be_exec_version()); + mget_req.set_fetch_row_store(_fetch_option.t_fetch_opt.fetch_row_store); return mget_req; } static void fetch_callback(bthread::CountdownEvent* counter) { Defer __defer([&] { counter->signal(); }); } -static Status MergeRPCResults(const std::vector<PMultiGetResponse>& rsps, - const std::vector<brpc::Controller>& cntls, - vectorized::MutableBlock* output_block) { +Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request, + const std::vector<PMultiGetResponse>& rsps, + const std::vector<brpc::Controller>& cntls, + vectorized::Block* output_block, + std::vector<RowId>* rows_id) const { + output_block->clear(); for (const auto& cntl : cntls) { if (cntl.Failed()) { LOG(WARNING) << "Failed to fetch meet rpc error:" << cntl.ErrorText() << ", host:" << cntl.remote_side(); return Status::InternalError(cntl.ErrorText()); } } - for (const auto& resp : rsps) { + + auto merge_function = [&](const PMultiGetResponse& resp) { Status st(resp.status()); if (!st.ok()) { LOG(WARNING) << "Failed to fetch " << st.to_string(); return st; } + for (const RowId& row_id : resp.row_ids()) { + rows_id->push_back(row_id); + } + // Merge binary rows + if (request.fetch_row_store()) { + if (output_block->is_empty_column()) { Review Comment: DCHECK resp.row_ids().size() == resp.binary_row_data_size() ########## be/src/olap/tablet.cpp: ########## @@ -2481,7 +2481,7 @@ Status Tablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segi Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, RowsetSharedPtr input_rowset, const TupleDescriptor* desc, - OlapReaderStatistics& stats, vectorized::Block* block, + OlapReaderStatistics& stats, std::string& values, Review Comment: It may be more efficient, but also affects local encapsulation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org