github-actions[bot] commented on code in PR #22504: URL: https://github.com/apache/doris/pull/22504#discussion_r1363072112
########## be/src/olap/calc_delete_bitmap_executor.cpp: ########## @@ -30,10 +30,12 @@ namespace doris { using namespace ErrorCode; -Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset, - const segment_v2::SegmentSharedPtr& cur_segment, - const std::vector<RowsetSharedPtr>& target_rowsets, - int64_t end_version, RowsetWriter* rowset_writer) { +Status CalcDeleteBitmapToken::submit( Review Comment: warning: method 'submit' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status CalcDeleteBitmapToken::submit( ``` ########## be/src/olap/rowset/segment_v2/segment_writer.cpp: ########## @@ -541,76 +604,103 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector<bool>& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos) { - // create old value columns - std::vector<uint32_t> cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids; - auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing); - CHECK(cids_missing.size() == old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->store_row_column(); - // record real pos, key is input line num, value is old_block line num - std::map<uint32_t, uint32_t> read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - read_index[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column(rowset, seg_it.first, rids, - cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } +void SegmentWriter::_calc_indicator_maps(uint32_t row_pos, uint32_t num_rows, + const IndicatorMapsVertical& indicator_maps_vertical) { + _indicator_maps.reset(new std::map<uint32_t, std::vector<uint32_t>>); // fixme(baohan): ? + for (auto [cid, indicator_map] : indicator_maps_vertical) { + for (uint32_t pos = row_pos; pos < row_pos + num_rows; pos++) { + if (indicator_map != nullptr && indicator_map[pos] != 0) { + (*_indicator_maps)[pos].emplace_back(cid); } } } +} + +// Consider a merge-on-write unique table with colums [k1, k2, v1, v1, v2, v3, v4, v5] where k1, k2 are key columns +// and v1, v2, v3, v4, v5 are value columns. The table has the following data: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |1 |1 |1 |1 |1 +// 2 |2 |2 |2 |2 |2 |2 +// 3 |3 |3 |3 |3 |3 |3 +// 4 |4 |4 |4 |4 |4 |4 +// 5 |5 |5 |5 |5 |5 |5 +// The user inserts the following data for partial update. Charactor `?` means that the cell is filled +// with indicator value(currently we use null as indicator value). +// row_num k1|k2|v1|v2|v3 +// 1 1 |1 |10|10|10 +// 2 2 |2 |? |20|20 +// 3 3 |3 |30|30|? +// 4 4 |4 |40|40|40 +// 5 5 |5 |50|? |50 +// Here, full_columns = [k1, k2, v1, v2, v3, v4, v5] +// old_full_read_columns = [v4, v5], the old values from the previous rows will be read into these columns. +// old_point_read_columns = [k1, k2, v1, v2, v3], the old values from the previous rows will be read into these columns +// if the correspoding columns in the input block has cell with indicator value. +// Becase the column is immutable, filled_including_value_columns will store the data merged from +// the original input block and old_point_read_columns. After the insertion, the data in the table will be: +// k1|k2|v1|v2|v3|v4|v5 +// 1 |1 |10|10|10|1 |1 +// 2 |2 |2 |20|20|2 |2 +// 3 |3 |30|30|3 |3 |3 +// 4 |4 |40|40|40|4 |4 +// 5 |5 |50|5 |50|5 |5 +Status SegmentWriter::fill_missing_columns( Review Comment: warning: method 'fill_missing_columns' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status SegmentWriter::fill_missing_columns( ``` ########## be/src/olap/tablet.cpp: ########## @@ -2709,15 +2709,16 @@ Status Tablet::_get_segment_column_iterator( } // fetch value by row column -Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid, - const std::vector<uint32_t>& rowids, - const std::vector<uint32_t>& cids, - vectorized::Block& block) { +Status Tablet::fetch_value_through_row_column( + RowsetSharedPtr input_rowset, uint32_t segid, const std::vector<uint32_t>& rids, Review Comment: warning: method 'fetch_value_through_row_column' can be made static [readability-convert-member-functions-to-static] be/src/olap/tablet.cpp:2711: ```diff - } + }static ``` ########## be/src/olap/tablet.cpp: ########## @@ -3132,108 +3165,274 @@ Status Tablet::generate_new_block_for_partial_update( const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, + std::shared_ptr<std::map<uint32_t, std::vector<uint32_t>>> indicator_maps, vectorized::Block* output_block) { // do partial update related works // 1. read columns by read plan // 2. generate new block // 3. write a new segment and modify rowset meta // 4. mark current keys deleted CHECK(output_block); - auto full_mutable_columns = output_block->mutate_columns(); - auto old_block = rowset_schema->create_block_by_cids(missing_cids); - auto update_block = rowset_schema->create_block_by_cids(update_cids); - - std::map<uint32_t, uint32_t> read_index_old; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset, - old_block, &read_index_old)); - - std::map<uint32_t, uint32_t> read_index_update; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, - rsid_to_rowset, update_block, &read_index_update)); + if (!indicator_maps) { + auto full_mutable_columns = output_block->mutate_columns(); + auto old_block = rowset_schema->create_block_by_cids(missing_cids); + auto update_block = rowset_schema->create_block_by_cids(update_cids); + + std::map<uint32_t, uint32_t> read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &missing_cids, &old_block, &read_index_old)); + + std::map<uint32_t, uint32_t> read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + + // build full block + CHECK(read_index_old.size() == read_index_update.size()); + for (auto i = 0; i < missing_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_old.size(); ++idx) { + full_mutable_columns[missing_cids[i]]->insert_from( + *old_block.get_columns_with_type_and_name()[i].column.get(), + read_index_old[idx]); + } + } + for (auto i = 0; i < update_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_update.size(); ++idx) { + full_mutable_columns[update_cids[i]]->insert_from( + *update_block.get_columns_with_type_and_name()[i].column.get(), + read_index_update[idx]); + } + } + VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); + } else { + std::unordered_set<uint32_t> cids_point_read; + for (const auto& [_, cids] : *indicator_maps) { + for (uint32_t cid : cids) { + cids_point_read.insert(cid); + } + } + std::vector<uint32_t> point_read_cids; + point_read_cids.reserve(cids_point_read.size()); + for (uint32_t cid : cids_point_read) { + point_read_cids.emplace_back(cid); + } + auto full_mutable_columns = output_block->mutate_columns(); + auto old_full_read_block = rowset_schema->create_block_by_cids(missing_cids); + auto point_read_block = rowset_schema->create_block_by_cids(point_read_cids); + + std::map<uint32_t, uint32_t> full_read_index_old; + std::map<uint32_t, std::map<uint32_t, uint32_t>> point_read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &missing_cids, &point_read_cids, &old_full_read_block, + &point_read_block, &full_read_index_old, + &point_read_index_old)); + + auto update_block = rowset_schema->create_block_by_cids(update_cids); + std::map<uint32_t, uint32_t> read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + // construct the full block + CHECK(full_read_index_old.size() == read_index_update.size()); + + for (size_t i = 0; i < missing_cids.size(); ++i) { + for (auto idx = 0; idx < full_read_index_old.size(); ++idx) { + full_mutable_columns[missing_cids[i]]->insert_from( + *old_full_read_block.get_by_position(i).column.get(), + full_read_index_old[idx]); + } + } - // build full block - CHECK(read_index_old.size() == read_index_update.size()); - for (auto i = 0; i < missing_cids.size(); ++i) { - for (auto idx = 0; idx < read_index_old.size(); ++idx) { - full_mutable_columns[missing_cids[i]]->insert_from( - *old_block.get_columns_with_type_and_name()[i].column.get(), - read_index_old[idx]); + for (size_t i = 0; i < update_cids.size(); i++) { + uint32_t cid = update_cids[i]; + if (!cids_point_read.contains(cid)) { + for (auto idx = 0; idx < full_read_index_old.size(); ++idx) { + full_mutable_columns[cid]->insert_from( + *update_block.get_by_position(i).column.get(), read_index_update[idx]); + } + } } - } - for (auto i = 0; i < update_cids.size(); ++i) { - for (auto idx = 0; idx < read_index_update.size(); ++idx) { - full_mutable_columns[update_cids[i]]->insert_from( - *update_block.get_columns_with_type_and_name()[i].column.get(), - read_index_update[idx]); + + for (size_t i = 0; i < point_read_cids.size(); i++) { + uint32_t cid = point_read_cids[i]; + for (uint32_t idx = 0; i < full_read_index_old.size(); i++) { + if (point_read_index_old[cid].contains(idx)) { + full_mutable_columns[cid]->insert_from( + *point_read_block.get_by_position(i).column.get(), + point_read_index_old[cid][idx]); + } else { + full_mutable_columns[cid]->insert_from( + *update_block.get_by_position(i).column.get(), idx); + } + } } } - VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); return Status::OK(); } - -// read columns by read plan -// read_index: ori_pos-> block_idx Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema, - const std::vector<uint32_t> cids_to_read, - const PartialUpdateReadPlan& read_plan, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, - vectorized::Block& block, - std::map<uint32_t, uint32_t>* read_index) { - bool has_row_column = tablet_schema->store_row_column(); - auto mutable_columns = block.mutate_columns(); - size_t read_idx = 0; - for (auto rs_it : read_plan) { - for (auto seg_it : rs_it.second) { - auto rowset_iter = rsid_to_rowset.find(rs_it.first); - CHECK(rowset_iter != rsid_to_rowset.end()); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - (*read_index)[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = fetch_value_through_row_column(rowset_iter->second, seg_it.first, rids, - cids_to_read, block); + const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>* cids_full_read, + vectorized::Block* block_full_read, + std::map<uint32_t, uint32_t>* full_read_index) { + auto full_read_columns = block_full_read->mutate_columns(); + uint32_t read_idx1 = 0; + + if (std::holds_alternative<RowStoreReadPlan>(read_plan)) { + const auto& row_store_read_plan = std::get<RowStoreReadPlan>(read_plan); + for (const auto& [rowset_id, segment_read_infos] : row_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + for (const auto& [segment_id, rows_info] : segment_read_infos) { + std::vector<uint32_t> rids; + for (const auto& [id_and_pos, cids] : rows_info) { + // set read index for missing columns + rids.emplace_back(id_and_pos.rid); + (*full_read_index)[id_and_pos.pos] = read_idx1++; + } + + auto st = fetch_value_through_row_column(rowset, segment_id, rids, rows_info, + cids_full_read, nullptr, block_full_read, + nullptr, false); if (!st.ok()) { LOG(WARNING) << "failed to fetch value through row column"; return st; } - continue; } - for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { - TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]); - auto st = fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids, - tablet_column, mutable_columns[cid]); - // set read value to output block + } + } else { + const auto& column_store_read_plan = std::get<ColumnStoreReadPlan>(read_plan); + for (const auto& [rowset_id, segment_read_infos] : column_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + + for (const auto& [segment_id, columns_info] : segment_read_infos) { + std::vector<uint32_t> rids; + for (auto [rid, pos] : columns_info.missing_column_rows) { + rids.emplace_back(rid); + // set read index for missing columns + (*full_read_index)[pos] = read_idx1++; + } + + // read values for missing columns + for (size_t i = 0; i < cids_full_read->size(); ++i) { + TabletColumn tablet_column = tablet_schema->column(cids_full_read->at(i)); + auto st = fetch_value_by_rowids(rowset, segment_id, rids, tablet_column, + full_read_columns[i]); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value by rowids"; + return st; + } + } + } + } + } + return Status::OK(); +} + +Status Tablet::read_columns_by_plan( + TabletSchemaSPtr tablet_schema, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Review Comment: warning: method 'read_columns_by_plan' can be made static [readability-convert-member-functions-to-static] be/src/olap/tablet.cpp:3330: ```diff - } + }static ``` ########## be/src/olap/tablet.cpp: ########## @@ -3132,108 +3165,274 @@ Status Tablet::generate_new_block_for_partial_update( const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, + std::shared_ptr<std::map<uint32_t, std::vector<uint32_t>>> indicator_maps, vectorized::Block* output_block) { // do partial update related works // 1. read columns by read plan // 2. generate new block // 3. write a new segment and modify rowset meta // 4. mark current keys deleted CHECK(output_block); - auto full_mutable_columns = output_block->mutate_columns(); - auto old_block = rowset_schema->create_block_by_cids(missing_cids); - auto update_block = rowset_schema->create_block_by_cids(update_cids); - - std::map<uint32_t, uint32_t> read_index_old; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset, - old_block, &read_index_old)); - - std::map<uint32_t, uint32_t> read_index_update; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, - rsid_to_rowset, update_block, &read_index_update)); + if (!indicator_maps) { + auto full_mutable_columns = output_block->mutate_columns(); + auto old_block = rowset_schema->create_block_by_cids(missing_cids); + auto update_block = rowset_schema->create_block_by_cids(update_cids); + + std::map<uint32_t, uint32_t> read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &missing_cids, &old_block, &read_index_old)); + + std::map<uint32_t, uint32_t> read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + + // build full block + CHECK(read_index_old.size() == read_index_update.size()); + for (auto i = 0; i < missing_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_old.size(); ++idx) { + full_mutable_columns[missing_cids[i]]->insert_from( + *old_block.get_columns_with_type_and_name()[i].column.get(), + read_index_old[idx]); + } + } + for (auto i = 0; i < update_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_update.size(); ++idx) { + full_mutable_columns[update_cids[i]]->insert_from( + *update_block.get_columns_with_type_and_name()[i].column.get(), + read_index_update[idx]); + } + } + VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); + } else { + std::unordered_set<uint32_t> cids_point_read; + for (const auto& [_, cids] : *indicator_maps) { + for (uint32_t cid : cids) { + cids_point_read.insert(cid); + } + } + std::vector<uint32_t> point_read_cids; + point_read_cids.reserve(cids_point_read.size()); + for (uint32_t cid : cids_point_read) { + point_read_cids.emplace_back(cid); + } + auto full_mutable_columns = output_block->mutate_columns(); + auto old_full_read_block = rowset_schema->create_block_by_cids(missing_cids); + auto point_read_block = rowset_schema->create_block_by_cids(point_read_cids); + + std::map<uint32_t, uint32_t> full_read_index_old; + std::map<uint32_t, std::map<uint32_t, uint32_t>> point_read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &missing_cids, &point_read_cids, &old_full_read_block, + &point_read_block, &full_read_index_old, + &point_read_index_old)); + + auto update_block = rowset_schema->create_block_by_cids(update_cids); + std::map<uint32_t, uint32_t> read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + // construct the full block + CHECK(full_read_index_old.size() == read_index_update.size()); + + for (size_t i = 0; i < missing_cids.size(); ++i) { + for (auto idx = 0; idx < full_read_index_old.size(); ++idx) { + full_mutable_columns[missing_cids[i]]->insert_from( + *old_full_read_block.get_by_position(i).column.get(), + full_read_index_old[idx]); + } + } - // build full block - CHECK(read_index_old.size() == read_index_update.size()); - for (auto i = 0; i < missing_cids.size(); ++i) { - for (auto idx = 0; idx < read_index_old.size(); ++idx) { - full_mutable_columns[missing_cids[i]]->insert_from( - *old_block.get_columns_with_type_and_name()[i].column.get(), - read_index_old[idx]); + for (size_t i = 0; i < update_cids.size(); i++) { + uint32_t cid = update_cids[i]; + if (!cids_point_read.contains(cid)) { + for (auto idx = 0; idx < full_read_index_old.size(); ++idx) { + full_mutable_columns[cid]->insert_from( + *update_block.get_by_position(i).column.get(), read_index_update[idx]); + } + } } - } - for (auto i = 0; i < update_cids.size(); ++i) { - for (auto idx = 0; idx < read_index_update.size(); ++idx) { - full_mutable_columns[update_cids[i]]->insert_from( - *update_block.get_columns_with_type_and_name()[i].column.get(), - read_index_update[idx]); + + for (size_t i = 0; i < point_read_cids.size(); i++) { + uint32_t cid = point_read_cids[i]; + for (uint32_t idx = 0; i < full_read_index_old.size(); i++) { + if (point_read_index_old[cid].contains(idx)) { + full_mutable_columns[cid]->insert_from( + *point_read_block.get_by_position(i).column.get(), + point_read_index_old[cid][idx]); + } else { + full_mutable_columns[cid]->insert_from( + *update_block.get_by_position(i).column.get(), idx); + } + } } } - VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); return Status::OK(); } - -// read columns by read plan -// read_index: ori_pos-> block_idx Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema, - const std::vector<uint32_t> cids_to_read, - const PartialUpdateReadPlan& read_plan, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, - vectorized::Block& block, - std::map<uint32_t, uint32_t>* read_index) { - bool has_row_column = tablet_schema->store_row_column(); - auto mutable_columns = block.mutate_columns(); - size_t read_idx = 0; - for (auto rs_it : read_plan) { - for (auto seg_it : rs_it.second) { - auto rowset_iter = rsid_to_rowset.find(rs_it.first); - CHECK(rowset_iter != rsid_to_rowset.end()); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - (*read_index)[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = fetch_value_through_row_column(rowset_iter->second, seg_it.first, rids, - cids_to_read, block); + const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>* cids_full_read, + vectorized::Block* block_full_read, + std::map<uint32_t, uint32_t>* full_read_index) { + auto full_read_columns = block_full_read->mutate_columns(); + uint32_t read_idx1 = 0; + + if (std::holds_alternative<RowStoreReadPlan>(read_plan)) { + const auto& row_store_read_plan = std::get<RowStoreReadPlan>(read_plan); + for (const auto& [rowset_id, segment_read_infos] : row_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + for (const auto& [segment_id, rows_info] : segment_read_infos) { + std::vector<uint32_t> rids; + for (const auto& [id_and_pos, cids] : rows_info) { + // set read index for missing columns + rids.emplace_back(id_and_pos.rid); + (*full_read_index)[id_and_pos.pos] = read_idx1++; + } + + auto st = fetch_value_through_row_column(rowset, segment_id, rids, rows_info, + cids_full_read, nullptr, block_full_read, + nullptr, false); if (!st.ok()) { LOG(WARNING) << "failed to fetch value through row column"; return st; } - continue; } - for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { - TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]); - auto st = fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids, - tablet_column, mutable_columns[cid]); - // set read value to output block + } + } else { + const auto& column_store_read_plan = std::get<ColumnStoreReadPlan>(read_plan); + for (const auto& [rowset_id, segment_read_infos] : column_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + + for (const auto& [segment_id, columns_info] : segment_read_infos) { + std::vector<uint32_t> rids; + for (auto [rid, pos] : columns_info.missing_column_rows) { + rids.emplace_back(rid); + // set read index for missing columns + (*full_read_index)[pos] = read_idx1++; + } + + // read values for missing columns + for (size_t i = 0; i < cids_full_read->size(); ++i) { + TabletColumn tablet_column = tablet_schema->column(cids_full_read->at(i)); + auto st = fetch_value_by_rowids(rowset, segment_id, rids, tablet_column, + full_read_columns[i]); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value by rowids"; + return st; + } + } + } + } + } + return Status::OK(); +} + +Status Tablet::read_columns_by_plan( + TabletSchemaSPtr tablet_schema, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, + const PartialUpdateReadPlan& read_plan, const std::vector<uint32_t>* cids_full_read, + const std::vector<uint32_t>* cids_point_read, vectorized::Block* block_full_read, + vectorized::Block* block_point_read, std::map<uint32_t, uint32_t>* full_read_index, + std::map<uint32_t, std::map<uint32_t, uint32_t>>* point_read_index) { + auto full_read_columns = block_full_read->mutate_columns(); + auto point_read_columns = block_point_read->mutate_columns(); + + uint32_t read_idx1 = 0; + std::map<uint32_t, uint32_t> read_idx2; + for (uint32_t cid : *cids_point_read) { + read_idx2[cid] = 0; + } + + if (std::holds_alternative<RowStoreReadPlan>(read_plan)) { + const auto& row_store_read_plan = std::get<RowStoreReadPlan>(read_plan); + for (const auto& [rowset_id, segment_read_infos] : row_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + for (const auto& [segment_id, rows_info] : segment_read_infos) { + std::vector<uint32_t> rids; + for (const auto& [id_and_pos, cids] : rows_info) { + // set read index for missing columns + rids.emplace_back(id_and_pos.rid); + (*full_read_index)[id_and_pos.pos] = read_idx1++; + for (const auto cid : cids) { + // set read index for partial update columns + (*point_read_index)[cid][id_and_pos.pos] = read_idx2[cid]++; + } + } + + auto st = fetch_value_through_row_column(rowset, segment_id, rids, rows_info, + cids_full_read, cids_point_read, + block_full_read, block_point_read, true); if (!st.ok()) { - LOG(WARNING) << "failed to fetch value"; + LOG(WARNING) << "failed to fetch value through row column"; return st; } } } + } else { + const auto& column_store_read_plan = std::get<ColumnStoreReadPlan>(read_plan); + for (const auto& [rowset_id, segment_read_infos] : column_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + + for (const auto& [segment_id, columns_info] : segment_read_infos) { + std::vector<uint32_t> rids; + for (auto [rid, pos] : columns_info.missing_column_rows) { + rids.emplace_back(rid); + // set read index for missing columns + (*full_read_index)[pos] = read_idx1++; + } + + // read values for missing columns + for (size_t i = 0; i < cids_full_read->size(); ++i) { + TabletColumn tablet_column = tablet_schema->column(cids_full_read->at(i)); + auto st = fetch_value_by_rowids(rowset, segment_id, rids, tablet_column, + full_read_columns[i]); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value by rowids"; + return st; + } + } + // read values for cells with indicator values in including columns + for (size_t i = 0; i < cids_point_read->size(); i++) { + const auto& rows_info = columns_info.partial_update_rows; + uint32_t cid = cids_point_read->at(i); + if (!rows_info.empty() && rows_info.contains(cid)) { + std::vector<uint32_t> rids; + for (auto [rid, pos] : rows_info.at(cid)) { + rids.emplace_back(rid); + // set read index for partial update columns + (*point_read_index)[cid][pos] = read_idx2[cid]++; + } + + TabletColumn tablet_column = tablet_schema->column(cid); + auto st = fetch_value_by_rowids(rowset, segment_id, rids, tablet_column, + point_read_columns[i]); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value by rowids"; + return st; + } + } + } + } + } } return Status::OK(); } -void Tablet::prepare_to_read(const RowLocation& row_location, size_t pos, - PartialUpdateReadPlan* read_plan) { - auto rs_it = read_plan->find(row_location.rowset_id); - if (rs_it == read_plan->end()) { - std::map<uint32_t, std::vector<RidAndPos>> segid_to_rid; - std::vector<RidAndPos> rid_pos; - rid_pos.emplace_back(RidAndPos {row_location.row_id, pos}); - segid_to_rid.emplace(row_location.segment_id, rid_pos); - read_plan->emplace(row_location.rowset_id, segid_to_rid); - return; - } - auto seg_it = rs_it->second.find(row_location.segment_id); - if (seg_it == rs_it->second.end()) { - std::vector<RidAndPos> rid_pos; - rid_pos.emplace_back(RidAndPos {row_location.row_id, pos}); - rs_it->second.emplace(row_location.segment_id, rid_pos); - return; +void Tablet::prepare_to_read(PartialUpdateReadPlan& read_plan, const RowLocation& row_location, Review Comment: warning: method 'prepare_to_read' can be made static [readability-convert-member-functions-to-static] ```suggestion }static ``` ########## be/src/vec/jsonb/serialize.cpp: ########## @@ -78,6 +78,27 @@ void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, } } +void JsonbSerializeUtil::jsonb_to_block( Review Comment: warning: method 'jsonb_to_block' can be made static [readability-convert-member-functions-to-static] ```suggestion static void JsonbSerializeUtil::jsonb_to_block( ``` ########## be/src/vec/jsonb/serialize.cpp: ########## @@ -78,6 +78,27 @@ void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, } } +void JsonbSerializeUtil::jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const ColumnString& jsonb_column, + const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read, + const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& Review Comment: warning: all parameters should be named in a function [readability-named-parameter] ```suggestion const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t> /*unused*/>& ``` ########## be/src/olap/tablet.cpp: ########## @@ -3132,108 +3165,274 @@ Status Tablet::generate_new_block_for_partial_update( const std::vector<uint32>& update_cids, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, + std::shared_ptr<std::map<uint32_t, std::vector<uint32_t>>> indicator_maps, vectorized::Block* output_block) { // do partial update related works // 1. read columns by read plan // 2. generate new block // 3. write a new segment and modify rowset meta // 4. mark current keys deleted CHECK(output_block); - auto full_mutable_columns = output_block->mutate_columns(); - auto old_block = rowset_schema->create_block_by_cids(missing_cids); - auto update_block = rowset_schema->create_block_by_cids(update_cids); - - std::map<uint32_t, uint32_t> read_index_old; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset, - old_block, &read_index_old)); - - std::map<uint32_t, uint32_t> read_index_update; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, - rsid_to_rowset, update_block, &read_index_update)); + if (!indicator_maps) { + auto full_mutable_columns = output_block->mutate_columns(); + auto old_block = rowset_schema->create_block_by_cids(missing_cids); + auto update_block = rowset_schema->create_block_by_cids(update_cids); + + std::map<uint32_t, uint32_t> read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &missing_cids, &old_block, &read_index_old)); + + std::map<uint32_t, uint32_t> read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + + // build full block + CHECK(read_index_old.size() == read_index_update.size()); + for (auto i = 0; i < missing_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_old.size(); ++idx) { + full_mutable_columns[missing_cids[i]]->insert_from( + *old_block.get_columns_with_type_and_name()[i].column.get(), + read_index_old[idx]); + } + } + for (auto i = 0; i < update_cids.size(); ++i) { + for (auto idx = 0; idx < read_index_update.size(); ++idx) { + full_mutable_columns[update_cids[i]]->insert_from( + *update_block.get_columns_with_type_and_name()[i].column.get(), + read_index_update[idx]); + } + } + VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); + } else { + std::unordered_set<uint32_t> cids_point_read; + for (const auto& [_, cids] : *indicator_maps) { + for (uint32_t cid : cids) { + cids_point_read.insert(cid); + } + } + std::vector<uint32_t> point_read_cids; + point_read_cids.reserve(cids_point_read.size()); + for (uint32_t cid : cids_point_read) { + point_read_cids.emplace_back(cid); + } + auto full_mutable_columns = output_block->mutate_columns(); + auto old_full_read_block = rowset_schema->create_block_by_cids(missing_cids); + auto point_read_block = rowset_schema->create_block_by_cids(point_read_cids); + + std::map<uint32_t, uint32_t> full_read_index_old; + std::map<uint32_t, std::map<uint32_t, uint32_t>> point_read_index_old; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_ori, + &missing_cids, &point_read_cids, &old_full_read_block, + &point_read_block, &full_read_index_old, + &point_read_index_old)); + + auto update_block = rowset_schema->create_block_by_cids(update_cids); + std::map<uint32_t, uint32_t> read_index_update; + RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, rsid_to_rowset, read_plan_update, + &update_cids, &update_block, &read_index_update)); + // construct the full block + CHECK(full_read_index_old.size() == read_index_update.size()); + + for (size_t i = 0; i < missing_cids.size(); ++i) { + for (auto idx = 0; idx < full_read_index_old.size(); ++idx) { + full_mutable_columns[missing_cids[i]]->insert_from( + *old_full_read_block.get_by_position(i).column.get(), + full_read_index_old[idx]); + } + } - // build full block - CHECK(read_index_old.size() == read_index_update.size()); - for (auto i = 0; i < missing_cids.size(); ++i) { - for (auto idx = 0; idx < read_index_old.size(); ++idx) { - full_mutable_columns[missing_cids[i]]->insert_from( - *old_block.get_columns_with_type_and_name()[i].column.get(), - read_index_old[idx]); + for (size_t i = 0; i < update_cids.size(); i++) { + uint32_t cid = update_cids[i]; + if (!cids_point_read.contains(cid)) { + for (auto idx = 0; idx < full_read_index_old.size(); ++idx) { + full_mutable_columns[cid]->insert_from( + *update_block.get_by_position(i).column.get(), read_index_update[idx]); + } + } } - } - for (auto i = 0; i < update_cids.size(); ++i) { - for (auto idx = 0; idx < read_index_update.size(); ++idx) { - full_mutable_columns[update_cids[i]]->insert_from( - *update_block.get_columns_with_type_and_name()[i].column.get(), - read_index_update[idx]); + + for (size_t i = 0; i < point_read_cids.size(); i++) { + uint32_t cid = point_read_cids[i]; + for (uint32_t idx = 0; i < full_read_index_old.size(); i++) { + if (point_read_index_old[cid].contains(idx)) { + full_mutable_columns[cid]->insert_from( + *point_read_block.get_by_position(i).column.get(), + point_read_index_old[cid][idx]); + } else { + full_mutable_columns[cid]->insert_from( + *update_block.get_by_position(i).column.get(), idx); + } + } } } - VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); return Status::OK(); } - -// read columns by read plan -// read_index: ori_pos-> block_idx Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema, - const std::vector<uint32_t> cids_to_read, - const PartialUpdateReadPlan& read_plan, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, - vectorized::Block& block, - std::map<uint32_t, uint32_t>* read_index) { - bool has_row_column = tablet_schema->store_row_column(); - auto mutable_columns = block.mutate_columns(); - size_t read_idx = 0; - for (auto rs_it : read_plan) { - for (auto seg_it : rs_it.second) { - auto rowset_iter = rsid_to_rowset.find(rs_it.first); - CHECK(rowset_iter != rsid_to_rowset.end()); - std::vector<uint32_t> rids; - for (auto id_and_pos : seg_it.second) { - rids.emplace_back(id_and_pos.rid); - (*read_index)[id_and_pos.pos] = read_idx++; - } - if (has_row_column) { - auto st = fetch_value_through_row_column(rowset_iter->second, seg_it.first, rids, - cids_to_read, block); + const PartialUpdateReadPlan& read_plan, + const std::vector<uint32_t>* cids_full_read, + vectorized::Block* block_full_read, + std::map<uint32_t, uint32_t>* full_read_index) { + auto full_read_columns = block_full_read->mutate_columns(); + uint32_t read_idx1 = 0; + + if (std::holds_alternative<RowStoreReadPlan>(read_plan)) { + const auto& row_store_read_plan = std::get<RowStoreReadPlan>(read_plan); + for (const auto& [rowset_id, segment_read_infos] : row_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + for (const auto& [segment_id, rows_info] : segment_read_infos) { + std::vector<uint32_t> rids; + for (const auto& [id_and_pos, cids] : rows_info) { + // set read index for missing columns + rids.emplace_back(id_and_pos.rid); + (*full_read_index)[id_and_pos.pos] = read_idx1++; + } + + auto st = fetch_value_through_row_column(rowset, segment_id, rids, rows_info, + cids_full_read, nullptr, block_full_read, + nullptr, false); if (!st.ok()) { LOG(WARNING) << "failed to fetch value through row column"; return st; } - continue; } - for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { - TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]); - auto st = fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids, - tablet_column, mutable_columns[cid]); - // set read value to output block + } + } else { + const auto& column_store_read_plan = std::get<ColumnStoreReadPlan>(read_plan); + for (const auto& [rowset_id, segment_read_infos] : column_store_read_plan) { + auto rowset = rsid_to_rowset.at(rowset_id); + CHECK(rowset); + + for (const auto& [segment_id, columns_info] : segment_read_infos) { + std::vector<uint32_t> rids; + for (auto [rid, pos] : columns_info.missing_column_rows) { + rids.emplace_back(rid); + // set read index for missing columns + (*full_read_index)[pos] = read_idx1++; + } + + // read values for missing columns + for (size_t i = 0; i < cids_full_read->size(); ++i) { + TabletColumn tablet_column = tablet_schema->column(cids_full_read->at(i)); + auto st = fetch_value_by_rowids(rowset, segment_id, rids, tablet_column, + full_read_columns[i]); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value by rowids"; + return st; + } + } + } + } + } + return Status::OK(); +} + +Status Tablet::read_columns_by_plan( + TabletSchemaSPtr tablet_schema, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, + const PartialUpdateReadPlan& read_plan, const std::vector<uint32_t>* cids_full_read, + const std::vector<uint32_t>* cids_point_read, vectorized::Block* block_full_read, + vectorized::Block* block_point_read, std::map<uint32_t, uint32_t>* full_read_index, + std::map<uint32_t, std::map<uint32_t, uint32_t>>* point_read_index) { + auto full_read_columns = block_full_read->mutate_columns(); Review Comment: warning: all parameters should be named in a function [readability-named-parameter] ```suggestion uint32_t> /*unused*/>* point_read_index) { ``` ########## be/src/vec/jsonb/serialize.h: ########## @@ -40,13 +40,30 @@ class JsonbSerializeUtil { public: static void block_to_jsonb(const TabletSchema& schema, const Block& block, ColumnString& dst, int num_cols, const DataTypeSerDeSPtrs& serdes); - // batch rows + // for batch rows static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const ColumnString& jsonb_column, const std::unordered_map<uint32_t, uint32_t>& col_id_to_idx, Block& dst); - // single row + static void jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const ColumnString& jsonb_column, + const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read, + const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& + col_uid_to_idx_cid_point_read, + const std::vector<ReadRowsInfo>& rows_info, Block& block_full_read, + Block& block_point_read); + + // for a single row static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const char* data, size_t size, const std::unordered_map<uint32_t, uint32_t>& col_id_to_idx, Block& dst); + static void jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const char* data, size_t size, + const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read, + const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& Review Comment: warning: parameter 6 is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& ``` ########## be/src/olap/txn_manager.cpp: ########## @@ -210,6 +210,42 @@ void TxnManager::set_txn_related_delete_bitmap( } } +void TxnManager::set_txn_related_delete_bitmap_and_indicator_maps( Review Comment: warning: method 'set_txn_related_delete_bitmap_and_indicator_maps' can be made static [readability-convert-member-functions-to-static] ```suggestion static void TxnManager::set_txn_related_delete_bitmap_and_indicator_maps( ``` ########## be/src/vec/jsonb/serialize.cpp: ########## @@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const } } +void JsonbSerializeUtil::jsonb_to_block( Review Comment: warning: method 'jsonb_to_block' can be made static [readability-convert-member-functions-to-static] ```suggestion static void JsonbSerializeUtil::jsonb_to_block( ``` ########## be/src/vec/jsonb/serialize.h: ########## @@ -40,13 +40,30 @@ class JsonbSerializeUtil { public: static void block_to_jsonb(const TabletSchema& schema, const Block& block, ColumnString& dst, int num_cols, const DataTypeSerDeSPtrs& serdes); - // batch rows + // for batch rows static void jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const ColumnString& jsonb_column, const std::unordered_map<uint32_t, uint32_t>& col_id_to_idx, Block& dst); - // single row + static void jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const ColumnString& jsonb_column, + const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read, + const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& Review Comment: warning: parameter 5 is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& ``` ########## be/src/vec/jsonb/serialize.cpp: ########## @@ -109,4 +130,41 @@ void JsonbSerializeUtil::jsonb_to_block(const DataTypeSerDeSPtrs& serdes, const } } +void JsonbSerializeUtil::jsonb_to_block( + const DataTypeSerDeSPtrs& serdes_full_read, const DataTypeSerDeSPtrs& serdes_point_read, + const char* data, size_t size, + const std::unordered_map<uint32_t, uint32_t>& col_uid_to_idx_full_read, + const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>>& Review Comment: warning: all parameters should be named in a function [readability-named-parameter] ```suggestion const std::unordered_map<uint32_t, std::pair<uint32_t, uint32_t> /*unused*/>& ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org