dataroaring commented on code in PR #39756: URL: https://github.com/apache/doris/pull/39756#discussion_r1732864192
########## be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp: ########## @@ -577,19 +579,301 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da return Status::OK(); } +Status VerticalSegmentWriter::_append_block_with_flexible_partial_content( + RowsInBlock& data, vectorized::Block& full_block) { + DCHECK(_is_mow()); + DCHECK(_opts.rowset_ctx->partial_update_info != nullptr); + DCHECK(_opts.rowset_ctx->partial_update_info->is_flexible_partial_update()); + + // data.block has the same schema with full_block + DCHECK(data.block->columns() == _tablet_schema->num_columns()); + + // create full block and fill with sort key columns + full_block = _tablet_schema->create_block(); + + auto segment_start_pos = _column_writers.front()->get_next_rowid(); + + // 1. encode key columns + // we can only encode sort key columns currently becasue all non-key columns in flexible partial update + // can have missing cells + std::vector<vectorized::IOlapColumnDataAccessor*> key_columns; + for (std::size_t cid {0}; cid < _num_sort_key_columns; cid++) { + full_block.replace_by_position(cid, data.block->get_by_position(cid).column); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + full_block.get_by_position(cid), data.row_pos, data.num_rows, cid)); + auto [status, column] = _olap_data_convertor->convert_column_data(cid); + if (!status.ok()) { + return status; + } + key_columns.push_back(column); + DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written); + RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(), + data.num_rows)); + DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + data.num_rows); + } + + // 2. encode sequence column + // We encode the seguence column even thought it may have invalid values in some rows because we need to + // encode the value of sequence column in key for rows that have a valid value in sequence column during + // lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have + // a valid sequence column to encode the key with seq col. + vectorized::IOlapColumnDataAccessor* seq_column = nullptr; + int32_t seq_col_unique_id = -1; + int32_t seq_map_col_unique_id = _opts.rowset_ctx->partial_update_info->sequence_map_col_uid(); + bool schema_has_sequence_col = _tablet_schema->has_sequence_col(); + if (schema_has_sequence_col) { + auto seq_col_idx = _tablet_schema->sequence_col_idx(); + seq_col_unique_id = _tablet_schema->column(seq_col_idx).unique_id(); + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + data.block->get_by_position(seq_col_idx), data.row_pos, data.num_rows, + seq_col_idx)); + auto [status, column] = _olap_data_convertor->convert_column_data(seq_col_idx); + if (!status.ok()) { + return status; + } + seq_column = column; + } + + DCHECK(_tablet_schema->has_skip_bitmap_col()); + auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx(); + std::vector<BitmapValue>* skip_bitmaps = &( + assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( + data.block->get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()) + ->get_data()); + + bool has_default_or_nullable = false; + std::vector<bool> use_default_or_null_flag; + use_default_or_null_flag.reserve(data.num_rows); + + const auto* delete_sign_column_data = + BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows); + DCHECK(delete_sign_column_data != nullptr); + int32_t delete_sign_col_unique_id = + _tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id(); + + std::vector<RowsetSharedPtr> specified_rowsets; + { + DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep", + { sleep(60); }) + std::shared_lock rlock(_tablet->get_header_lock()); + specified_rowsets = _mow_context->rowset_ptrs; + if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { + // Only when this is a strict mode partial update that missing rowsets here will lead to problems. + // In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase) + LOG(WARNING) << fmt::format( + "[Memtable Flush] some rowsets have been deleted due to " + "compaction(specified_rowsets.size()={}, but rowset_ids.size()={}) in " + "partial update. tablet_id: {}, cur max_version: {}, transaction_id: {}", + specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(), + _mow_context->max_version, _mow_context->txn_id); + if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { + return Status::InternalError<false>( + "[Memtable Flush] some rowsets have been deleted due to " + "compaction in strict mode partial update"); + } + } + } + std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); + + FlexibleReadPlan read_plan; + + // locate rows in base data + int64_t num_rows_updated = 0; + int64_t num_rows_new_added = 0; + int64_t num_rows_deleted = 0; + int64_t num_rows_filtered = 0; + for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) { + // block segment + // 2 -> 0 + // 3 -> 1 + // 4 -> 2 + // 5 -> 3 + // here row_pos = 2, num_rows = 4. + size_t delta_pos = block_pos - data.row_pos; + size_t segment_pos = segment_start_pos + delta_pos; + auto& skip_bitmap = skip_bitmaps->at(block_pos); + + // the hidden sequence column should have the same mark with sequence map column + if (seq_map_col_unique_id != -1) { + DCHECK(schema_has_sequence_col); + if (skip_bitmap.contains(seq_map_col_unique_id)) { + skip_bitmap.add(seq_col_unique_id); + } + } + + std::string key = _full_encode_keys(key_columns, delta_pos); + _maybe_invalid_row_cache(key); + bool row_has_sequence_col = + (schema_has_sequence_col && !skip_bitmap.contains(seq_col_unique_id)); + if (row_has_sequence_col) { + _encode_seq_column(seq_column, delta_pos, &key); + } + + // mark key with delete sign as deleted. + bool have_delete_sign = (!skip_bitmap.contains(delete_sign_col_unique_id) && + delete_sign_column_data[block_pos] != 0); + + RowLocation loc; + // save rowset shared ptr so this rowset wouldn't delete + RowsetSharedPtr rowset; + auto st = _tablet->lookup_row_key(key, row_has_sequence_col, specified_rowsets, &loc, + _mow_context->max_version, segment_caches, &rowset); + if (st.is<KEY_NOT_FOUND>()) { + if (_opts.rowset_ctx->partial_update_info->is_strict_mode) { + ++num_rows_filtered; + // delete the invalid newly inserted row + _mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, + DeleteBitmap::TEMP_VERSION_COMMON}, + segment_pos); + } else { + bool can_insert_new_rows_in_partial_update = true; + std::string error_column; + for (auto cid : _opts.rowset_ctx->partial_update_info->missing_cids) { + const TabletColumn& col = _tablet_schema->column(cid); + if (skip_bitmap.contains(col.unique_id()) && !col.has_default_value() && + !col.is_nullable()) { + error_column = col.name(); + can_insert_new_rows_in_partial_update = false; + break; + } + } + if (!can_insert_new_rows_in_partial_update) { + return Status::Error<INVALID_SCHEMA, false>( + "the unmentioned column `{}` should have default value or be nullable " + "for " + "newly inserted rows in non-strict mode partial update", + error_column); + } + } + ++num_rows_new_added; + has_default_or_nullable = true; + use_default_or_null_flag.emplace_back(true); + continue; + } + if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) { + LOG(WARNING) << "failed to lookup row key, error: " << st; + return st; + } + + // 1. if the delete sign is marked, it means that the value columns of the row will not + // be read. So we don't need to read the missing values from the previous rows. + // 2. the one exception is when there are sequence columns in the table, we need to read + // the sequence columns, otherwise it may cause the merge-on-read based compaction + // policy to produce incorrect results + if (have_delete_sign && !schema_has_sequence_col) { + has_default_or_nullable = true; + use_default_or_null_flag.emplace_back(true); + } else { + // partial update should not contain invisible columns + use_default_or_null_flag.emplace_back(false); + _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); + read_plan.prepare_to_read(loc, segment_pos, skip_bitmap); + } + + if (st.is<KEY_ALREADY_EXISTS>()) { + // although we need to mark delete current row, we still need to read missing columns + // for this row, we need to ensure that each column is aligned + _mow_context->delete_bitmap->add( + {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, + segment_pos); + ++num_rows_deleted; + } else { + _mow_context->delete_bitmap->add( + {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id); + ++num_rows_updated; + } + } + CHECK_EQ(use_default_or_null_flag.size(), data.num_rows); + + if (config::enable_merge_on_write_correctness_check) { + _tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(), + _mow_context->rowset_ids); + } + + // read to fill full_block + RETURN_IF_ERROR(read_plan.fill_non_sort_key_columns( + _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block, + use_default_or_null_flag, has_default_or_nullable, segment_start_pos, data.row_pos, + data.block, skip_bitmaps)); + + // TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation? + // this column is not needed in read path for merge-on-write table + + // row column should be filled here + // convert block to row store format + _serialize_block_to_row_column(full_block); + + // encode all non-key columns(including sequence column if exists) and append to column_writers + for (auto cid = _num_sort_key_columns; cid < _tablet_schema->num_columns(); cid++) { + RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( + full_block.get_by_position(cid), data.row_pos, data.num_rows, cid)); + auto [status, column] = _olap_data_convertor->convert_column_data(cid); + if (!status.ok()) { + return status; + } + if (cid == _tablet_schema->sequence_col_idx()) { + // should use the latest encoded sequence column to build the primary index + seq_column = column; + } + DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written); + RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(), + data.num_rows)); + DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + data.num_rows); + } + + _num_rows_updated += num_rows_updated; + _num_rows_deleted += num_rows_deleted; + _num_rows_new_added += num_rows_new_added; + _num_rows_filtered += num_rows_filtered; + + if (_num_rows_written != data.row_pos || + _primary_key_index_builder->num_rows() != _num_rows_written) { + return Status::InternalError( + "Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key " + "index builder num rows: {}", + _num_rows_written, data.row_pos, _primary_key_index_builder->num_rows()); + } + + // build primary key index + for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) { + size_t delta_pos = block_pos - data.row_pos; + std::string key = _full_encode_keys(key_columns, delta_pos); + if (schema_has_sequence_col) { + _encode_seq_column(seq_column, delta_pos, &key); + } + RETURN_IF_ERROR(_primary_key_index_builder->add_item(key)); + } + + _num_rows_written += data.num_rows; + DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written) + << "primary key index builder num rows(" << _primary_key_index_builder->num_rows() + << ") not equal to segment writer's num rows written(" << _num_rows_written << ")"; + _olap_data_convertor->clear_source_content(); + return Status::OK(); +} Review Comment: too long function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org