This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 9944a8c3be2 [fix](branch-2.0)(profile) avoid update profile in deconstructor (#32013) 9944a8c3be2 is described below commit 9944a8c3be219170b979626dba120d81ae5a8c4c Author: Mingyu Chen <morning...@163.com> AuthorDate: Fri Jul 12 23:03:04 2024 +0800 [fix](branch-2.0)(profile) avoid update profile in deconstructor (#32013) bp #32131 --- be/src/exec/line_reader.h | 3 +- be/src/io/fs/buffered_reader.cpp | 24 ++++++- be/src/io/fs/buffered_reader.h | 45 ++++++++---- be/src/io/fs/file_reader.h | 3 +- be/src/io/fs/hdfs_file_reader.cpp | 74 ++++++++++--------- be/src/io/fs/hdfs_file_reader.h | 2 + .../line_reader.h => util/profile_collector.h} | 30 +++++--- .../file_reader/new_plain_text_line_reader.cpp | 7 ++ .../file_reader/new_plain_text_line_reader.h | 3 + be/src/vec/exec/format/generic_reader.h | 3 +- be/src/vec/exec/format/jni_reader.h | 7 ++ be/src/vec/exec/format/json/new_json_reader.cpp | 9 +++ be/src/vec/exec/format/json/new_json_reader.h | 3 + be/src/vec/exec/format/orc/vorc_reader.cpp | 16 ++++- be/src/vec/exec/format/orc/vorc_reader.h | 9 ++- .../exec/format/parquet/vparquet_group_reader.h | 9 ++- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 82 ++++++++++++---------- be/src/vec/exec/format/parquet/vparquet_reader.h | 5 ++ be/src/vec/exec/format/table/table_format_reader.h | 7 ++ be/src/vec/exec/jni_connector.cpp | 62 +++++++++------- be/src/vec/exec/jni_connector.h | 6 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 4 +- be/src/vec/exec/scan/new_olap_scanner.h | 2 +- be/src/vec/exec/scan/vfile_scanner.cpp | 19 +++-- be/src/vec/exec/scan/vfile_scanner.h | 2 + be/src/vec/exec/scan/vscanner.cpp | 2 +- be/src/vec/exec/scan/vscanner.h | 4 +- 27 files changed, 298 insertions(+), 144 deletions(-) diff --git a/be/src/exec/line_reader.h b/be/src/exec/line_reader.h index 15e92b5f228..311bf2953c2 100644 --- a/be/src/exec/line_reader.h +++ b/be/src/exec/line_reader.h @@ -19,13 +19,14 @@ #include "common/factory_creator.h" #include "common/status.h" +#include "util/profile_collector.h" namespace doris { namespace io { class IOContext; } // This class is used to read content line by line -class LineReader { +class LineReader : public ProfileCollector { public: virtual ~LineReader() = default; virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof, diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 339ee9a8b11..b477a71d3dd 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -602,6 +602,9 @@ void PrefetchBuffer::close() { } _buffer_status = BufferStatus::CLOSED; _prefetched.notify_all(); +} + +void PrefetchBuffer::_collect_profile_before_close() { if (_sync_profile != nullptr) { _sync_profile(*this); } @@ -652,9 +655,6 @@ PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File } PrefetchBufferedReader::~PrefetchBufferedReader() { - /// set `_sync_profile` to nullptr to avoid updating counter after the runtime profile has been released. - std::for_each(_pre_buffers.begin(), _pre_buffers.end(), - [](std::shared_ptr<PrefetchBuffer>& buffer) { buffer->_sync_profile = nullptr; }); /// Better not to call virtual functions in a destructor. _close_internal(); } @@ -699,6 +699,17 @@ Status PrefetchBufferedReader::_close_internal() { return Status::OK(); } +void PrefetchBufferedReader::_collect_profile_before_close() { + std::for_each(_pre_buffers.begin(), _pre_buffers.end(), + [](std::shared_ptr<PrefetchBuffer>& buffer) { + buffer->collect_profile_before_close(); + }); + if (_reader != nullptr) { + _reader->collect_profile_before_close(); + } +} + +// InMemoryFileReader InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) : _reader(std::move(reader)) { _size = _reader->size(); } @@ -735,6 +746,13 @@ Status InMemoryFileReader::read_at_impl(size_t offset, Slice result, size_t* byt return Status::OK(); } +void InMemoryFileReader::_collect_profile_before_close() { + if (_reader != nullptr) { + _reader->collect_profile_before_close(); + } +} + +// BufferedFileStreamReader BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length, size_t max_buf_size) : _file(file), diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index b7d5a39da72..a7f60e0db7a 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -180,17 +180,6 @@ public: Status close() override { if (!_closed) { _closed = true; - // the underlying buffer is closed in its own destructor - // return _reader->close(); - if (_profile != nullptr) { - COUNTER_UPDATE(_copy_time, _statistics.copy_time); - COUNTER_UPDATE(_read_time, _statistics.read_time); - COUNTER_UPDATE(_request_io, _statistics.request_io); - COUNTER_UPDATE(_merged_io, _statistics.merged_io); - COUNTER_UPDATE(_request_bytes, _statistics.request_bytes); - COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes); - COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes); - } } return Status::OK(); } @@ -219,6 +208,21 @@ protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; + void _collect_profile_before_close() override { + if (_profile != nullptr) { + COUNTER_UPDATE(_copy_time, _statistics.copy_time); + COUNTER_UPDATE(_read_time, _statistics.read_time); + COUNTER_UPDATE(_request_io, _statistics.request_io); + COUNTER_UPDATE(_merged_io, _statistics.merged_io); + COUNTER_UPDATE(_request_bytes, _statistics.request_bytes); + COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes); + COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes); + if (_reader != nullptr) { + _reader->collect_profile_before_close(); + } + } + } + private: RuntimeProfile::Counter* _copy_time; RuntimeProfile::Counter* _read_time; @@ -275,7 +279,7 @@ public: }; class PrefetchBufferedReader; -struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> { +struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public ProfileCollector { enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED }; PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t whole_buffer_size, @@ -355,6 +359,10 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> { int search_read_range(size_t off) const; size_t merge_small_ranges(size_t off, int range_index) const; + + void _collect_profile_at_runtime() override {} + + void _collect_profile_before_close() override; }; /** @@ -400,6 +408,8 @@ protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; + void _collect_profile_before_close() override; + private: Status _close_internal(); size_t get_buffer_pos(int64_t position) const { @@ -454,6 +464,8 @@ protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; + void _collect_profile_before_close() override; + private: Status _close_internal(); io::FileReaderSPtr _reader; @@ -494,7 +506,7 @@ protected: Statistics _statistics; }; -class BufferedFileStreamReader : public BufferedStreamReader { +class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector { public: BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length, size_t max_buf_size); @@ -505,6 +517,13 @@ public: Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override; std::string path() override { return _file->path(); } +protected: + void _collect_profile_before_close() override { + if (_file != nullptr) { + _file->collect_profile_before_close(); + } + } + private: std::unique_ptr<uint8_t[]> _buf; io::FileReaderSPtr _file; diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index d4f7c5d5302..2e398ebfabf 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "io/fs/path.h" +#include "util/profile_collector.h" #include "util/slice.h" namespace doris { @@ -33,7 +34,7 @@ namespace io { class FileSystem; class IOContext; -class FileReader { +class FileReader : public doris::ProfileCollector { public: FileReader() = default; virtual ~FileReader() = default; diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index d344447ae54..dc103334e6d 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -77,41 +77,6 @@ Status HdfsFileReader::close() { bool expected = false; if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { DorisMetrics::instance()->hdfs_file_open_reading->increment(-1); - if (_profile != nullptr && is_hdfs(_name_node)) { -#ifdef USE_HADOOP_HDFS - struct hdfsReadStatistics* hdfs_statistics = nullptr; - auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics); - if (r != 0) { - return Status::InternalError( - fmt::format("Failed to run hdfsFileGetReadStatistics(): {}", r)); - } - COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead); - COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, - hdfs_statistics->totalLocalBytesRead); - COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read, - hdfs_statistics->totalShortCircuitBytesRead); - COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read, - hdfs_statistics->totalZeroCopyBytesRead); - hdfsFileFreeReadStatistics(hdfs_statistics); - - struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr; - r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics); - if (r != 0) { - return Status::InternalError( - fmt::format("Failed to run hdfsGetHedgedReadMetrics(): {}", r)); - } - - COUNTER_UPDATE(_hdfs_profile.total_hedged_read, - hdfs_hedged_read_statistics->hedgedReadOps); - COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread, - hdfs_hedged_read_statistics->hedgedReadOpsInCurThread); - COUNTER_UPDATE(_hdfs_profile.hedged_read_wins, - hdfs_hedged_read_statistics->hedgedReadOpsWin); - - hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics); - hdfsFileClearReadStatistics(_handle->file()); -#endif - } } return Status::OK(); } @@ -212,5 +177,44 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r return Status::OK(); } #endif + +void HdfsFileReader::_collect_profile_before_close() { + if (_profile != nullptr && is_hdfs(_name_node)) { +#ifdef USE_HADOOP_HDFS + struct hdfsReadStatistics* hdfs_statistics = nullptr; + auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics); + if (r != 0) { + LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r + << ", name node: " << _name_node; + return; + } + COUNTER_UPDATE(_hdfs_profile.total_bytes_read, hdfs_statistics->totalBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, hdfs_statistics->totalLocalBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read, + hdfs_statistics->totalShortCircuitBytesRead); + COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read, + hdfs_statistics->totalZeroCopyBytesRead); + hdfsFileFreeReadStatistics(hdfs_statistics); + + struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr; + r = hdfsGetHedgedReadMetrics(_handle->fs(), &hdfs_hedged_read_statistics); + if (r != 0) { + LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r + << ", name node: " << _name_node; + return; + } + + COUNTER_UPDATE(_hdfs_profile.total_hedged_read, hdfs_hedged_read_statistics->hedgedReadOps); + COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread, + hdfs_hedged_read_statistics->hedgedReadOpsInCurThread); + COUNTER_UPDATE(_hdfs_profile.hedged_read_wins, + hdfs_hedged_read_statistics->hedgedReadOpsWin); + + hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics); + hdfsFileClearReadStatistics(_handle->file()); +#endif + } +} + } // namespace io } // namespace doris diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h index 4e093e1c9b1..48c13789f89 100644 --- a/be/src/io/fs/hdfs_file_reader.h +++ b/be/src/io/fs/hdfs_file_reader.h @@ -57,6 +57,8 @@ protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; + void _collect_profile_before_close() override; + private: #ifdef USE_HADOOP_HDFS struct HDFSProfile { diff --git a/be/src/exec/line_reader.h b/be/src/util/profile_collector.h similarity index 61% copy from be/src/exec/line_reader.h copy to be/src/util/profile_collector.h index 15e92b5f228..abdccaceb4e 100644 --- a/be/src/exec/line_reader.h +++ b/be/src/util/profile_collector.h @@ -17,21 +17,29 @@ #pragma once -#include "common/factory_creator.h" -#include "common/status.h" +#include <atomic> namespace doris { -namespace io { -class IOContext; -} -// This class is used to read content line by line -class LineReader { + +class ProfileCollector { public: - virtual ~LineReader() = default; - virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof, - const io::IOContext* io_ctx) = 0; + void collect_profile_at_runtime() { _collect_profile_at_runtime(); } + + void collect_profile_before_close() { + bool expected = false; + if (_collected.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + _collect_profile_before_close(); + } + } + + virtual ~ProfileCollector() {} + +protected: + virtual void _collect_profile_at_runtime() {} + virtual void _collect_profile_before_close() {} - virtual void close() = 0; +private: + std::atomic<bool> _collected = false; }; } // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp index 04953a2b54d..8dce6e589af 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp @@ -479,4 +479,11 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool return Status::OK(); } + +void NewPlainTextLineReader::_collect_profile_before_close() { + if (_file_reader != nullptr) { + _file_reader->collect_profile_before_close(); + } +} + } // namespace doris diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h index 8dadee1ba64..462ff2d5734 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h +++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h @@ -196,6 +196,9 @@ public: void close() override; +protected: + void _collect_profile_before_close() override; + private: bool update_eof(); diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 7842f2edb92..e4ee898db13 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -22,6 +22,7 @@ #include "common/factory_creator.h" #include "common/status.h" #include "runtime/types.h" +#include "util/profile_collector.h" #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { @@ -30,7 +31,7 @@ class Block; // This a reader interface for all file readers. // A GenericReader is responsible for reading a file and return // a set of blocks with specified schema, -class GenericReader { +class GenericReader : public ProfileCollector { public: GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {} void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { diff --git a/be/src/vec/exec/format/jni_reader.h b/be/src/vec/exec/format/jni_reader.h index 500301bd4b6..31ccf4af406 100644 --- a/be/src/vec/exec/format/jni_reader.h +++ b/be/src/vec/exec/format/jni_reader.h @@ -63,6 +63,13 @@ public: Status init_reader( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); +protected: + void _collect_profile_before_close() override { + if (_jni_connector != nullptr) { + _jni_connector->collect_profile_before_close(); + } + } + private: const std::vector<SlotDescriptor*>& _file_slot_descs; RuntimeState* _state; diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index ba7488da4d0..a41fa3881b1 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -1793,4 +1793,13 @@ Status NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc, return Status::OK(); } +void NewJsonReader::_collect_profile_before_close() { + if (_line_reader != nullptr) { + _line_reader->collect_profile_before_close(); + } + if (_file_reader != nullptr) { + _file_reader->collect_profile_before_close(); + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index 16b51d63f41..9d4696a6422 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -94,6 +94,9 @@ public: Status get_parsed_schema(std::vector<std::string>* col_names, std::vector<TypeDescriptor>* col_types) override; +protected: + void _collect_profile_before_close() override; + private: Status _get_range_params(); void _init_system_properties(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 8bf7fe3f2d6..46512170a45 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -180,13 +180,12 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r } OrcReader::~OrcReader() { - _collect_profile_on_close(); if (_obj_pool && _obj_pool.get()) { _obj_pool->clear(); } } -void OrcReader::_collect_profile_on_close() { +void OrcReader::_collect_profile_before_close() { if (_profile != nullptr) { COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time); COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls); @@ -198,6 +197,10 @@ void OrcReader::_collect_profile_on_close() { COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time); COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time); COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time); + + if (_file_input_stream != nullptr) { + _file_input_stream->collect_profile_before_close(); + } } } @@ -2247,6 +2250,9 @@ MutableColumnPtr OrcReader::_convert_dict_column_to_string_column( void ORCFileInputStream::beforeReadStripe( std::unique_ptr<orc::StripeInformation> current_strip_information, std::vector<bool> selected_columns) { + if (_file_reader != nullptr) { + _file_reader->collect_profile_before_close(); + } // Generate prefetch ranges, build stripe file reader. uint64_t offset = current_strip_information->getOffset(); std::vector<io::PrefetchRange> prefetch_ranges; @@ -2274,4 +2280,10 @@ void ORCFileInputStream::beforeReadStripe( } } +void ORCFileInputStream::_collect_profile_before_close() { + if (_file_reader != nullptr) { + _file_reader->collect_profile_before_close(); + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index b87ccb823fc..4cb5361b8d8 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -196,6 +196,9 @@ public: std::unordered_map<std::string, orc::StringDictionary*>& column_name_to_dict_map, bool* is_stripe_filtered); +protected: + void _collect_profile_before_close() override; + private: struct OrcProfile { RuntimeProfile::Counter* read_time; @@ -564,7 +567,7 @@ private: std::vector<orc::TypeKind>* _unsupported_pushdown_types; }; -class ORCFileInputStream : public orc::InputStream { +class ORCFileInputStream : public orc::InputStream, public ProfileCollector { public: ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr inner_reader, OrcReader::Statistics* statistics, const io::IOContext* io_ctx, @@ -589,6 +592,10 @@ public: void beforeReadStripe(std::unique_ptr<orc::StripeInformation> current_strip_information, std::vector<bool> selected_columns) override; +protected: + void _collect_profile_at_runtime() override {}; + void _collect_profile_before_close() override; + private: const std::string& _file_name; io::FileReaderSPtr _inner_reader; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 0063fca768d..7a8f71da6cf 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -64,7 +64,7 @@ namespace doris::vectorized { // TODO: we need to determine it by test. static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max(); -class RowGroupReader { +class RowGroupReader : public ProfileCollector { public: static const std::vector<int64_t> NO_DELETE; @@ -163,6 +163,13 @@ public: void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } int64_t get_remaining_rows() { return _remaining_rows; } +protected: + void _collect_profile_before_close() override { + if (_file_reader != nullptr) { + _file_reader->collect_profile_before_close(); + } + } + private: void _merge_read_ranges(std::vector<RowRange>& row_ranges); Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index e0bd043c713..ff1e1afff4b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -173,43 +173,6 @@ void ParquetReader::close() { void ParquetReader::_close_internal() { if (!_closed) { - if (_profile != nullptr) { - COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups); - COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _statistics.read_row_groups); - COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _statistics.filtered_group_rows); - COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _statistics.filtered_page_rows); - COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows, - _statistics.lazy_read_filtered_rows); - COUNTER_UPDATE(_parquet_profile.filtered_bytes, _statistics.filtered_bytes); - COUNTER_UPDATE(_parquet_profile.raw_rows_read, _statistics.read_rows); - COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes); - COUNTER_UPDATE(_parquet_profile.column_read_time, _statistics.column_read_time); - COUNTER_UPDATE(_parquet_profile.parse_meta_time, _statistics.parse_meta_time); - COUNTER_UPDATE(_parquet_profile.parse_footer_time, _statistics.parse_footer_time); - COUNTER_UPDATE(_parquet_profile.open_file_time, _statistics.open_file_time); - COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num); - COUNTER_UPDATE(_parquet_profile.page_index_filter_time, - _statistics.page_index_filter_time); - COUNTER_UPDATE(_parquet_profile.row_group_filter_time, - _statistics.row_group_filter_time); - - COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time); - COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls); - COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, - _column_statistics.meta_read_calls); - COUNTER_UPDATE(_parquet_profile.file_read_bytes, _column_statistics.read_bytes); - COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); - COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt); - COUNTER_UPDATE(_parquet_profile.decode_header_time, - _column_statistics.decode_header_time); - COUNTER_UPDATE(_parquet_profile.decode_value_time, - _column_statistics.decode_value_time); - COUNTER_UPDATE(_parquet_profile.decode_dict_time, _column_statistics.decode_dict_time); - COUNTER_UPDATE(_parquet_profile.decode_level_time, - _column_statistics.decode_level_time); - COUNTER_UPDATE(_parquet_profile.decode_null_map_time, - _column_statistics.decode_null_map_time); - } _closed = true; } } @@ -592,6 +555,9 @@ RowGroupReader::PositionDeleteContext ParquetReader::_get_position_delete_ctx( } Status ParquetReader::_next_row_group_reader() { + if (_current_group_reader != nullptr) { + _current_group_reader->collect_profile_before_close(); + } if (_read_row_groups.empty()) { _row_group_eof = true; _current_group_reader.reset(nullptr); @@ -934,4 +900,46 @@ int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& } return column.data_page_offset; } + +void ParquetReader::_collect_profile() { + if (_profile == nullptr) { + return; + } + + if (_current_group_reader != nullptr) { + _current_group_reader->collect_profile_before_close(); + } + COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups); + COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _statistics.read_row_groups); + COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _statistics.filtered_group_rows); + COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _statistics.filtered_page_rows); + COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows); + COUNTER_UPDATE(_parquet_profile.filtered_bytes, _statistics.filtered_bytes); + COUNTER_UPDATE(_parquet_profile.raw_rows_read, _statistics.read_rows); + COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes); + COUNTER_UPDATE(_parquet_profile.column_read_time, _statistics.column_read_time); + COUNTER_UPDATE(_parquet_profile.parse_meta_time, _statistics.parse_meta_time); + COUNTER_UPDATE(_parquet_profile.parse_footer_time, _statistics.parse_footer_time); + COUNTER_UPDATE(_parquet_profile.open_file_time, _statistics.open_file_time); + COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num); + COUNTER_UPDATE(_parquet_profile.page_index_filter_time, _statistics.page_index_filter_time); + COUNTER_UPDATE(_parquet_profile.row_group_filter_time, _statistics.row_group_filter_time); + + COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time); + COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls); + COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, _column_statistics.meta_read_calls); + COUNTER_UPDATE(_parquet_profile.file_read_bytes, _column_statistics.read_bytes); + COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); + COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt); + COUNTER_UPDATE(_parquet_profile.decode_header_time, _column_statistics.decode_header_time); + COUNTER_UPDATE(_parquet_profile.decode_value_time, _column_statistics.decode_value_time); + COUNTER_UPDATE(_parquet_profile.decode_dict_time, _column_statistics.decode_dict_time); + COUNTER_UPDATE(_parquet_profile.decode_level_time, _column_statistics.decode_level_time); + COUNTER_UPDATE(_parquet_profile.decode_null_map_time, _column_statistics.decode_null_map_time); +} + +void ParquetReader::_collect_profile_before_close() { + _collect_profile(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 679b06fa296..bdc58669ce5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -152,6 +152,9 @@ public: _table_col_to_file_col = map; } +protected: + void _collect_profile_before_close() override; + private: struct ParquetProfile { RuntimeProfile::Counter* filtered_row_groups; @@ -211,7 +214,9 @@ private: std::string _meta_cache_key(const std::string& path) { return "meta_" + path; } std::vector<io::PrefetchRange> _generate_random_access_ranges( const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size); + void _collect_profile(); +private: RuntimeProfile* _profile; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 5ce9856ad8a..9426d116334 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -57,6 +57,13 @@ public: virtual Status init_row_filters(const TFileRangeDesc& range) = 0; +protected: + void _collect_profile_before_close() override { + if (_file_format_reader != nullptr) { + _file_format_reader->collect_profile_before_close(); + } + } + protected: std::string _table_format; // hudi, iceberg std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 218bd260455..a639249cb35 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -160,32 +160,6 @@ Status JniConnector::close() { JNIEnv* env = nullptr; RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); if (_scanner_opened) { - // update scanner metrics - for (const auto& metric : get_statistics(env)) { - std::vector<std::string> type_and_name = split(metric.first, ":"); - if (type_and_name.size() != 2) { - LOG(WARNING) << "Name of JNI Scanner metric should be pattern like " - << "'metricType:metricName'"; - continue; - } - long metric_value = std::stol(metric.second); - RuntimeProfile::Counter* scanner_counter; - if (type_and_name[0] == "timer") { - scanner_counter = - ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str()); - } else if (type_and_name[0] == "counter") { - scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT, - _connector_name.c_str()); - } else if (type_and_name[0] == "bytes") { - scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES, - _connector_name.c_str()); - } else { - LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes"; - continue; - } - COUNTER_UPDATE(scanner_counter, metric_value); - } - // _fill_block may be failed and returned, we should release table in close. // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); @@ -617,4 +591,40 @@ Status JniConnector::generate_meta_info(Block* block, std::unique_ptr<long[]>& m memcpy(meta.get(), &meta_data[0], meta_data.size() * 8); return Status::OK(); } + +void JniConnector::_collect_profile_before_close() { + if (_scanner_opened && _profile != nullptr) { + JNIEnv* env = nullptr; + Status st = JniUtil::GetJNIEnv(&env); + if (!st) { + LOG(WARNING) << "failed to get jni env when collect profile: " << st; + return; + } + // update scanner metrics + for (const auto& metric : get_statistics(env)) { + std::vector<std::string> type_and_name = split(metric.first, ":"); + if (type_and_name.size() != 2) { + LOG(WARNING) << "Name of JNI Scanner metric should be pattern like " + << "'metricType:metricName'"; + continue; + } + long metric_value = std::stol(metric.second); + RuntimeProfile::Counter* scanner_counter; + if (type_and_name[0] == "timer") { + scanner_counter = + ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str()); + } else if (type_and_name[0] == "counter") { + scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT, + _connector_name.c_str()); + } else if (type_and_name[0] == "bytes") { + scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES, + _connector_name.c_str()); + } else { + LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes"; + continue; + } + COUNTER_UPDATE(scanner_counter, metric_value); + } + } +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index c053c8a1f61..3bb39927f59 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -33,6 +33,7 @@ #include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" #include "runtime/types.h" +#include "util/profile_collector.h" #include "util/runtime_profile.h" #include "util/string_util.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -58,7 +59,7 @@ namespace doris::vectorized { /** * Connector to java jni scanner, which should extend org.apache.doris.common.jni.JniScanner */ -class JniConnector { +class JniConnector : public ProfileCollector { public: /** * The predicates that can be pushed down to java side. @@ -243,6 +244,9 @@ public: static Status generate_meta_info(Block* block, std::unique_ptr<long[]>& meta); +protected: + void _collect_profile_before_close() override; + private: std::string _connector_name; std::string _connector_class; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index ddecaccd195..b0e0ea6cc23 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -499,14 +499,14 @@ void NewOlapScanner::_update_realtime_counters() { _tablet_reader->mutable_stats()->raw_rows_read = 0; } -void NewOlapScanner::_update_counters_before_close() { +void NewOlapScanner::_collect_profile_before_close() { // Please don't directly enable the profile here, we need to set QueryStatistics using the counter inside. if (_has_updated_counter) { return; } _has_updated_counter = true; - VScanner::_update_counters_before_close(); + VScanner::_collect_profile_before_close(); // Update counters for NewOlapScanner NewOlapScanNode* olap_parent = (NewOlapScanNode*)_parent; diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 6ecb6a1dc5d..debf9f8bfc7 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -78,7 +78,7 @@ public: protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; - void _update_counters_before_close() override; + void _collect_profile_before_close() override; private: void _update_realtime_counters(); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 6143fbd2681..34e8acea8dd 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -664,6 +664,7 @@ void VFileScanner::_truncate_char_or_varchar_column(Block* block, int idx, int l Status VFileScanner::_get_next_reader() { while (true) { if (_cur_reader) { + _cur_reader->collect_profile_before_close(); _cur_reader->close(); } _cur_reader.reset(nullptr); @@ -1070,11 +1071,6 @@ Status VFileScanner::close(RuntimeState* state) { return Status::OK(); } - if (config::enable_file_cache && _state->query_options().enable_file_cache) { - io::FileCacheProfileReporter cache_profile(_profile); - cache_profile.update(_file_cache_statistics.get()); - } - if (_cur_reader) { _cur_reader->close(); } @@ -1090,4 +1086,17 @@ void VFileScanner::try_stop() { } } +void VFileScanner::_collect_profile_before_close() { + VScanner::_collect_profile_before_close(); + if (config::enable_file_cache && _state->query_options().enable_file_cache && + _profile != nullptr) { + io::FileCacheProfileReporter cache_profile(_profile); + cache_profile.update(_file_cache_statistics.get()); + } + + if (_cur_reader != nullptr) { + _cur_reader->collect_profile_before_close(); + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 0344b04fc1c..f972337fe29 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -91,6 +91,8 @@ protected: // TODO: cast input block columns type to string. Status _cast_src_block(Block* block) { return Status::OK(); } + void _collect_profile_before_close() override; + protected: std::unique_ptr<TextConverter> _text_converter; const TFileScanRangeParams* _params; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 82e81bb7ae0..2408f7ec709 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -167,7 +167,7 @@ Status VScanner::close(RuntimeState* state) { return Status::OK(); } -void VScanner::_update_counters_before_close() { +void VScanner::_collect_profile_before_close() { if (_parent) { COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer); COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read); diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 7d4b36ff44d..7d5ba2fb782 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -79,7 +79,7 @@ protected: virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0; // Update the counters before closing this scanner - virtual void _update_counters_before_close(); + virtual void _collect_profile_before_close(); // Filter the output block finally. Status _filter_output_block(Block* block); @@ -137,7 +137,7 @@ public: // update counters. For example, update counters depend on scanner's tablet, but // the tablet == null when init failed. if (_is_open) { - _update_counters_before_close(); + _collect_profile_before_close(); } _need_to_close = true; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org