gavinchou commented on code in PR #31853: URL: https://github.com/apache/doris/pull/31853#discussion_r1514051381
########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -664,6 +739,13 @@ Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool ch // TODO write zonemap to meta rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); rowset_meta->set_creation_time(time(nullptr)); + + if (auto seg_file_size = _seg_files.segments_file_size(); !seg_file_size.has_value()) { + LOG(ERROR) << seg_file_size.error(); Review Comment: Should we return an error status? ########## be/src/olap/rowset/rowset_meta.h: ########## @@ -314,6 +314,12 @@ class RowsetMeta { void set_txn_expiration(int64_t expiration) { _rowset_meta_pb.set_txn_expiration(expiration); } + // `seg_file_size` MUST ordered by segment id + void set_segments_file_size(const std::vector<size_t>& seg_file_size); Review Comment: how to ensure > `seg_file_size` MUST ordered by segment id should we check the order or sort it in `set_segments_file_size` ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -96,12 +97,92 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, } // namespace +SegmentFileCollection::~SegmentFileCollection() = default; + +Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { + std::lock_guard lock(_lock); + if (_closed) [[unlikely]] { + DCHECK(false) << writer->path(); + return Status::InternalError("add to closed SegmentFileCollection"); + } + + _file_writers.emplace_back(seg_id, std::move(writer)); + return Status::OK(); +} + +Status SegmentFileCollection::close() { + { + std::lock_guard lock(_lock); + if (_closed) [[unlikely]] { + DCHECK(false); + return Status::InternalError("double close SegmentFileCollection"); + } + _closed = true; + } + + for (auto&& [_, writer] : _file_writers) { + RETURN_IF_ERROR(writer->close()); + } + + return Status::OK(); +} + +Result<std::vector<size_t>> SegmentFileCollection::segments_file_size() { + std::lock_guard lock(_lock); + if (!_closed) [[unlikely]] { + DCHECK(false); + return ResultError(Status::InternalError("get segments file size without closed")); + } + + Status st; + std::vector<size_t> seg_file_size(_file_writers.size(), 0); + bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { + auto&& [seg_id, writer] = it; + + if (seg_id > seg_file_size.size()) [[unlikely]] { Review Comment: it seems to be `seg_id >= seg_file_size.size()` ? ########## be/src/olap/rowset/vertical_beta_rowset_writer.h: ########## @@ -17,28 +17,28 @@ #pragma once -#include <stddef.h> -#include <stdint.h> - #include <memory> +#include <type_traits> #include <vector> #include "common/status.h" #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/segment_v2/segment_writer.h" -#include "olap/rowset/vertical_beta_rowset_writer_helper.h" namespace doris { namespace vectorized { class Block; } // namespace vectorized // for vertical compaction -// TODO(plat1ko): Inherited from template type `T`, `T` is `BetaRowsetWriter` or `CloudBetaRowsetWriter` -class VerticalBetaRowsetWriter final : public BetaRowsetWriter { +template <class T> + requires std::is_base_of_v<BaseBetaRowsetWriter, T> +class VerticalBetaRowsetWriter final : public T { Review Comment: does gcc compile? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -96,12 +97,92 @@ void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, } // namespace +SegmentFileCollection::~SegmentFileCollection() = default; + +Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { + std::lock_guard lock(_lock); + if (_closed) [[unlikely]] { + DCHECK(false) << writer->path(); + return Status::InternalError("add to closed SegmentFileCollection"); + } + + _file_writers.emplace_back(seg_id, std::move(writer)); + return Status::OK(); +} + +Status SegmentFileCollection::close() { + { + std::lock_guard lock(_lock); + if (_closed) [[unlikely]] { + DCHECK(false); + return Status::InternalError("double close SegmentFileCollection"); + } + _closed = true; + } + + for (auto&& [_, writer] : _file_writers) { + RETURN_IF_ERROR(writer->close()); + } + + return Status::OK(); +} + +Result<std::vector<size_t>> SegmentFileCollection::segments_file_size() { + std::lock_guard lock(_lock); + if (!_closed) [[unlikely]] { + DCHECK(false); + return ResultError(Status::InternalError("get segments file size without closed")); + } + + Status st; + std::vector<size_t> seg_file_size(_file_writers.size(), 0); + bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { + auto&& [seg_id, writer] = it; + + if (seg_id > seg_file_size.size()) [[unlikely]] { + auto err_msg = fmt::format("invalid seg_id={} num_file_writers={} path={}", seg_id, + seg_file_size.size(), writer->path().native()); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + + auto& fsize = seg_file_size[seg_id]; + if (fsize != 0) { + // File size should not been set + auto err_msg = + fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native()); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + + fsize = writer->bytes_appended(); + if (fsize <= 0) { + auto err_msg = + fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native()); + DCHECK(false) << err_msg; + st = Status::InternalError(err_msg); + return false; + } + + return true; + }); + + if (succ) { Review Comment: try ternary expr. to reduce lines of code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org