This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 9944a8c3be2 [fix](branch-2.0)(profile) avoid update profile in 
deconstructor (#32013)
9944a8c3be2 is described below

commit 9944a8c3be219170b979626dba120d81ae5a8c4c
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Fri Jul 12 23:03:04 2024 +0800

    [fix](branch-2.0)(profile) avoid update profile in deconstructor (#32013)
    
    bp #32131
---
 be/src/exec/line_reader.h                          |  3 +-
 be/src/io/fs/buffered_reader.cpp                   | 24 ++++++-
 be/src/io/fs/buffered_reader.h                     | 45 ++++++++----
 be/src/io/fs/file_reader.h                         |  3 +-
 be/src/io/fs/hdfs_file_reader.cpp                  | 74 ++++++++++---------
 be/src/io/fs/hdfs_file_reader.h                    |  2 +
 .../line_reader.h => util/profile_collector.h}     | 30 +++++---
 .../file_reader/new_plain_text_line_reader.cpp     |  7 ++
 .../file_reader/new_plain_text_line_reader.h       |  3 +
 be/src/vec/exec/format/generic_reader.h            |  3 +-
 be/src/vec/exec/format/jni_reader.h                |  7 ++
 be/src/vec/exec/format/json/new_json_reader.cpp    |  9 +++
 be/src/vec/exec/format/json/new_json_reader.h      |  3 +
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 16 ++++-
 be/src/vec/exec/format/orc/vorc_reader.h           |  9 ++-
 .../exec/format/parquet/vparquet_group_reader.h    |  9 ++-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 82 ++++++++++++----------
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  5 ++
 be/src/vec/exec/format/table/table_format_reader.h |  7 ++
 be/src/vec/exec/jni_connector.cpp                  | 62 +++++++++-------
 be/src/vec/exec/jni_connector.h                    |  6 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  4 +-
 be/src/vec/exec/scan/new_olap_scanner.h            |  2 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             | 19 +++--
 be/src/vec/exec/scan/vfile_scanner.h               |  2 +
 be/src/vec/exec/scan/vscanner.cpp                  |  2 +-
 be/src/vec/exec/scan/vscanner.h                    |  4 +-
 27 files changed, 298 insertions(+), 144 deletions(-)

diff --git a/be/src/exec/line_reader.h b/be/src/exec/line_reader.h
index 15e92b5f228..311bf2953c2 100644
--- a/be/src/exec/line_reader.h
+++ b/be/src/exec/line_reader.h
@@ -19,13 +19,14 @@
 
 #include "common/factory_creator.h"
 #include "common/status.h"
+#include "util/profile_collector.h"
 
 namespace doris {
 namespace io {
 class IOContext;
 }
 // This class is used to read content line by line
-class LineReader {
+class LineReader : public ProfileCollector {
 public:
     virtual ~LineReader() = default;
     virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof,
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 339ee9a8b11..b477a71d3dd 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -602,6 +602,9 @@ void PrefetchBuffer::close() {
     }
     _buffer_status = BufferStatus::CLOSED;
     _prefetched.notify_all();
+}
+
+void PrefetchBuffer::_collect_profile_before_close() {
     if (_sync_profile != nullptr) {
         _sync_profile(*this);
     }
@@ -652,9 +655,6 @@ 
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
 }
 
 PrefetchBufferedReader::~PrefetchBufferedReader() {
-    /// set `_sync_profile` to nullptr to avoid updating counter after the 
runtime profile has been released.
-    std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
-                  [](std::shared_ptr<PrefetchBuffer>& buffer) { 
buffer->_sync_profile = nullptr; });
     /// Better not to call virtual functions in a destructor.
     _close_internal();
 }
@@ -699,6 +699,17 @@ Status PrefetchBufferedReader::_close_internal() {
     return Status::OK();
 }
 
+void PrefetchBufferedReader::_collect_profile_before_close() {
+    std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
+                  [](std::shared_ptr<PrefetchBuffer>& buffer) {
+                      buffer->collect_profile_before_close();
+                  });
+    if (_reader != nullptr) {
+        _reader->collect_profile_before_close();
+    }
+}
+
+// InMemoryFileReader
 InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) : 
_reader(std::move(reader)) {
     _size = _reader->size();
 }
@@ -735,6 +746,13 @@ Status InMemoryFileReader::read_at_impl(size_t offset, 
Slice result, size_t* byt
     return Status::OK();
 }
 
+void InMemoryFileReader::_collect_profile_before_close() {
+    if (_reader != nullptr) {
+        _reader->collect_profile_before_close();
+    }
+}
+
+// BufferedFileStreamReader
 BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, 
uint64_t offset,
                                                    uint64_t length, size_t 
max_buf_size)
         : _file(file),
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index b7d5a39da72..a7f60e0db7a 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -180,17 +180,6 @@ public:
     Status close() override {
         if (!_closed) {
             _closed = true;
-            // the underlying buffer is closed in its own destructor
-            // return _reader->close();
-            if (_profile != nullptr) {
-                COUNTER_UPDATE(_copy_time, _statistics.copy_time);
-                COUNTER_UPDATE(_read_time, _statistics.read_time);
-                COUNTER_UPDATE(_request_io, _statistics.request_io);
-                COUNTER_UPDATE(_merged_io, _statistics.merged_io);
-                COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
-                COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
-                COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
-            }
         }
         return Status::OK();
     }
@@ -219,6 +208,21 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override {
+        if (_profile != nullptr) {
+            COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+            COUNTER_UPDATE(_read_time, _statistics.read_time);
+            COUNTER_UPDATE(_request_io, _statistics.request_io);
+            COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+            COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+            COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
+            COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
+            if (_reader != nullptr) {
+                _reader->collect_profile_before_close();
+            }
+        }
+    }
+
 private:
     RuntimeProfile::Counter* _copy_time;
     RuntimeProfile::Counter* _read_time;
@@ -275,7 +279,7 @@ public:
 };
 
 class PrefetchBufferedReader;
-struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> {
+struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public 
ProfileCollector {
     enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
 
     PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t 
whole_buffer_size,
@@ -355,6 +359,10 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer> {
     int search_read_range(size_t off) const;
 
     size_t merge_small_ranges(size_t off, int range_index) const;
+
+    void _collect_profile_at_runtime() override {}
+
+    void _collect_profile_before_close() override;
 };
 
 /**
@@ -400,6 +408,8 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override;
+
 private:
     Status _close_internal();
     size_t get_buffer_pos(int64_t position) const {
@@ -454,6 +464,8 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override;
+
 private:
     Status _close_internal();
     io::FileReaderSPtr _reader;
@@ -494,7 +506,7 @@ protected:
     Statistics _statistics;
 };
 
-class BufferedFileStreamReader : public BufferedStreamReader {
+class BufferedFileStreamReader : public BufferedStreamReader, public 
ProfileCollector {
 public:
     BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, 
uint64_t length,
                              size_t max_buf_size);
@@ -505,6 +517,13 @@ public:
     Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) 
override;
     std::string path() override { return _file->path(); }
 
+protected:
+    void _collect_profile_before_close() override {
+        if (_file != nullptr) {
+            _file->collect_profile_before_close();
+        }
+    }
+
 private:
     std::unique_ptr<uint8_t[]> _buf;
     io::FileReaderSPtr _file;
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index d4f7c5d5302..2e398ebfabf 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -24,6 +24,7 @@
 
 #include "common/status.h"
 #include "io/fs/path.h"
+#include "util/profile_collector.h"
 #include "util/slice.h"
 
 namespace doris {
@@ -33,7 +34,7 @@ namespace io {
 class FileSystem;
 class IOContext;
 
-class FileReader {
+class FileReader : public doris::ProfileCollector {
 public:
     FileReader() = default;
     virtual ~FileReader() = default;
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index d344447ae54..dc103334e6d 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -77,41 +77,6 @@ Status HdfsFileReader::close() {
     bool expected = false;
     if (_closed.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
         DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
-        if (_profile != nullptr && is_hdfs(_name_node)) {
-#ifdef USE_HADOOP_HDFS
-            struct hdfsReadStatistics* hdfs_statistics = nullptr;
-            auto r = hdfsFileGetReadStatistics(_handle->file(), 
&hdfs_statistics);
-            if (r != 0) {
-                return Status::InternalError(
-                        fmt::format("Failed to run 
hdfsFileGetReadStatistics(): {}", r));
-            }
-            COUNTER_UPDATE(_hdfs_profile.total_bytes_read, 
hdfs_statistics->totalBytesRead);
-            COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
-                           hdfs_statistics->totalLocalBytesRead);
-            COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
-                           hdfs_statistics->totalShortCircuitBytesRead);
-            COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
-                           hdfs_statistics->totalZeroCopyBytesRead);
-            hdfsFileFreeReadStatistics(hdfs_statistics);
-
-            struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = 
nullptr;
-            r = hdfsGetHedgedReadMetrics(_handle->fs(), 
&hdfs_hedged_read_statistics);
-            if (r != 0) {
-                return Status::InternalError(
-                        fmt::format("Failed to run hdfsGetHedgedReadMetrics(): 
{}", r));
-            }
-
-            COUNTER_UPDATE(_hdfs_profile.total_hedged_read,
-                           hdfs_hedged_read_statistics->hedgedReadOps);
-            COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
-                           
hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
-            COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
-                           hdfs_hedged_read_statistics->hedgedReadOpsWin);
-
-            hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
-            hdfsFileClearReadStatistics(_handle->file());
-#endif
-        }
     }
     return Status::OK();
 }
@@ -212,5 +177,44 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
     return Status::OK();
 }
 #endif
+
+void HdfsFileReader::_collect_profile_before_close() {
+    if (_profile != nullptr && is_hdfs(_name_node)) {
+#ifdef USE_HADOOP_HDFS
+        struct hdfsReadStatistics* hdfs_statistics = nullptr;
+        auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
+        if (r != 0) {
+            LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r
+                         << ", name node: " << _name_node;
+            return;
+        }
+        COUNTER_UPDATE(_hdfs_profile.total_bytes_read, 
hdfs_statistics->totalBytesRead);
+        COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, 
hdfs_statistics->totalLocalBytesRead);
+        COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
+                       hdfs_statistics->totalShortCircuitBytesRead);
+        COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
+                       hdfs_statistics->totalZeroCopyBytesRead);
+        hdfsFileFreeReadStatistics(hdfs_statistics);
+
+        struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr;
+        r = hdfsGetHedgedReadMetrics(_handle->fs(), 
&hdfs_hedged_read_statistics);
+        if (r != 0) {
+            LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r
+                         << ", name node: " << _name_node;
+            return;
+        }
+
+        COUNTER_UPDATE(_hdfs_profile.total_hedged_read, 
hdfs_hedged_read_statistics->hedgedReadOps);
+        COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
+                       hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
+        COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
+                       hdfs_hedged_read_statistics->hedgedReadOpsWin);
+
+        hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
+        hdfsFileClearReadStatistics(_handle->file());
+#endif
+    }
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 4e093e1c9b1..48c13789f89 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -57,6 +57,8 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override;
+
 private:
 #ifdef USE_HADOOP_HDFS
     struct HDFSProfile {
diff --git a/be/src/exec/line_reader.h b/be/src/util/profile_collector.h
similarity index 61%
copy from be/src/exec/line_reader.h
copy to be/src/util/profile_collector.h
index 15e92b5f228..abdccaceb4e 100644
--- a/be/src/exec/line_reader.h
+++ b/be/src/util/profile_collector.h
@@ -17,21 +17,29 @@
 
 #pragma once
 
-#include "common/factory_creator.h"
-#include "common/status.h"
+#include <atomic>
 
 namespace doris {
-namespace io {
-class IOContext;
-}
-// This class is used to read content line by line
-class LineReader {
+
+class ProfileCollector {
 public:
-    virtual ~LineReader() = default;
-    virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof,
-                             const io::IOContext* io_ctx) = 0;
+    void collect_profile_at_runtime() { _collect_profile_at_runtime(); }
+
+    void collect_profile_before_close() {
+        bool expected = false;
+        if (_collected.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
+            _collect_profile_before_close();
+        }
+    }
+
+    virtual ~ProfileCollector() {}
+
+protected:
+    virtual void _collect_profile_at_runtime() {}
+    virtual void _collect_profile_before_close() {}
 
-    virtual void close() = 0;
+private:
+    std::atomic<bool> _collected = false;
 };
 
 } // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index 04953a2b54d..8dce6e589af 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -479,4 +479,11 @@ Status NewPlainTextLineReader::read_line(const uint8_t** 
ptr, size_t* size, bool
 
     return Status::OK();
 }
+
+void NewPlainTextLineReader::_collect_profile_before_close() {
+    if (_file_reader != nullptr) {
+        _file_reader->collect_profile_before_close();
+    }
+}
+
 } // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
index 8dadee1ba64..462ff2d5734 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
@@ -196,6 +196,9 @@ public:
 
     void close() override;
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     bool update_eof();
 
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index 7842f2edb92..e4ee898db13 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -22,6 +22,7 @@
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "runtime/types.h"
+#include "util/profile_collector.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::vectorized {
@@ -30,7 +31,7 @@ class Block;
 // This a reader interface for all file readers.
 // A GenericReader is responsible for reading a file and return
 // a set of blocks with specified schema,
-class GenericReader {
+class GenericReader : public ProfileCollector {
 public:
     GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}
     void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
diff --git a/be/src/vec/exec/format/jni_reader.h 
b/be/src/vec/exec/format/jni_reader.h
index 500301bd4b6..31ccf4af406 100644
--- a/be/src/vec/exec/format/jni_reader.h
+++ b/be/src/vec/exec/format/jni_reader.h
@@ -63,6 +63,13 @@ public:
     Status init_reader(
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
+protected:
+    void _collect_profile_before_close() override {
+        if (_jni_connector != nullptr) {
+            _jni_connector->collect_profile_before_close();
+        }
+    }
+
 private:
     const std::vector<SlotDescriptor*>& _file_slot_descs;
     RuntimeState* _state;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index ba7488da4d0..a41fa3881b1 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1793,4 +1793,13 @@ Status 
NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc,
     return Status::OK();
 }
 
+void NewJsonReader::_collect_profile_before_close() {
+    if (_line_reader != nullptr) {
+        _line_reader->collect_profile_before_close();
+    }
+    if (_file_reader != nullptr) {
+        _file_reader->collect_profile_before_close();
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 16b51d63f41..9d4696a6422 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -94,6 +94,9 @@ public:
     Status get_parsed_schema(std::vector<std::string>* col_names,
                              std::vector<TypeDescriptor>* col_types) override;
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     Status _get_range_params();
     void _init_system_properties();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 8bf7fe3f2d6..46512170a45 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -180,13 +180,12 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, 
const TFileRangeDesc& r
 }
 
 OrcReader::~OrcReader() {
-    _collect_profile_on_close();
     if (_obj_pool && _obj_pool.get()) {
         _obj_pool->clear();
     }
 }
 
-void OrcReader::_collect_profile_on_close() {
+void OrcReader::_collect_profile_before_close() {
     if (_profile != nullptr) {
         COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
         COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
@@ -198,6 +197,10 @@ void OrcReader::_collect_profile_on_close() {
         COUNTER_UPDATE(_orc_profile.set_fill_column_time, 
_statistics.set_fill_column_time);
         COUNTER_UPDATE(_orc_profile.decode_value_time, 
_statistics.decode_value_time);
         COUNTER_UPDATE(_orc_profile.decode_null_map_time, 
_statistics.decode_null_map_time);
+
+        if (_file_input_stream != nullptr) {
+            _file_input_stream->collect_profile_before_close();
+        }
     }
 }
 
@@ -2247,6 +2250,9 @@ MutableColumnPtr 
OrcReader::_convert_dict_column_to_string_column(
 void ORCFileInputStream::beforeReadStripe(
         std::unique_ptr<orc::StripeInformation> current_strip_information,
         std::vector<bool> selected_columns) {
+    if (_file_reader != nullptr) {
+        _file_reader->collect_profile_before_close();
+    }
     // Generate prefetch ranges, build stripe file reader.
     uint64_t offset = current_strip_information->getOffset();
     std::vector<io::PrefetchRange> prefetch_ranges;
@@ -2274,4 +2280,10 @@ void ORCFileInputStream::beforeReadStripe(
     }
 }
 
+void ORCFileInputStream::_collect_profile_before_close() {
+    if (_file_reader != nullptr) {
+        _file_reader->collect_profile_before_close();
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index b87ccb823fc..4cb5361b8d8 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -196,6 +196,9 @@ public:
             std::unordered_map<std::string, orc::StringDictionary*>& 
column_name_to_dict_map,
             bool* is_stripe_filtered);
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     struct OrcProfile {
         RuntimeProfile::Counter* read_time;
@@ -564,7 +567,7 @@ private:
     std::vector<orc::TypeKind>* _unsupported_pushdown_types;
 };
 
-class ORCFileInputStream : public orc::InputStream {
+class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
 public:
     ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr 
inner_reader,
                        OrcReader::Statistics* statistics, const io::IOContext* 
io_ctx,
@@ -589,6 +592,10 @@ public:
     void beforeReadStripe(std::unique_ptr<orc::StripeInformation> 
current_strip_information,
                           std::vector<bool> selected_columns) override;
 
+protected:
+    void _collect_profile_at_runtime() override {};
+    void _collect_profile_before_close() override;
+
 private:
     const std::string& _file_name;
     io::FileReaderSPtr _inner_reader;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 0063fca768d..7a8f71da6cf 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -64,7 +64,7 @@ namespace doris::vectorized {
 // TODO: we need to determine it by test.
 static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = 
std::numeric_limits<uint32_t>::max();
 
-class RowGroupReader {
+class RowGroupReader : public ProfileCollector {
 public:
     static const std::vector<int64_t> NO_DELETE;
 
@@ -163,6 +163,13 @@ public:
     void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
     int64_t get_remaining_rows() { return _remaining_rows; }
 
+protected:
+    void _collect_profile_before_close() override {
+        if (_file_reader != nullptr) {
+            _file_reader->collect_profile_before_close();
+        }
+    }
+
 private:
     void _merge_read_ranges(std::vector<RowRange>& row_ranges);
     Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* 
batch_eof);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index e0bd043c713..ff1e1afff4b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -173,43 +173,6 @@ void ParquetReader::close() {
 
 void ParquetReader::_close_internal() {
     if (!_closed) {
-        if (_profile != nullptr) {
-            COUNTER_UPDATE(_parquet_profile.filtered_row_groups, 
_statistics.filtered_row_groups);
-            COUNTER_UPDATE(_parquet_profile.to_read_row_groups, 
_statistics.read_row_groups);
-            COUNTER_UPDATE(_parquet_profile.filtered_group_rows, 
_statistics.filtered_group_rows);
-            COUNTER_UPDATE(_parquet_profile.filtered_page_rows, 
_statistics.filtered_page_rows);
-            COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows,
-                           _statistics.lazy_read_filtered_rows);
-            COUNTER_UPDATE(_parquet_profile.filtered_bytes, 
_statistics.filtered_bytes);
-            COUNTER_UPDATE(_parquet_profile.raw_rows_read, 
_statistics.read_rows);
-            COUNTER_UPDATE(_parquet_profile.to_read_bytes, 
_statistics.read_bytes);
-            COUNTER_UPDATE(_parquet_profile.column_read_time, 
_statistics.column_read_time);
-            COUNTER_UPDATE(_parquet_profile.parse_meta_time, 
_statistics.parse_meta_time);
-            COUNTER_UPDATE(_parquet_profile.parse_footer_time, 
_statistics.parse_footer_time);
-            COUNTER_UPDATE(_parquet_profile.open_file_time, 
_statistics.open_file_time);
-            COUNTER_UPDATE(_parquet_profile.open_file_num, 
_statistics.open_file_num);
-            COUNTER_UPDATE(_parquet_profile.page_index_filter_time,
-                           _statistics.page_index_filter_time);
-            COUNTER_UPDATE(_parquet_profile.row_group_filter_time,
-                           _statistics.row_group_filter_time);
-
-            COUNTER_UPDATE(_parquet_profile.file_read_time, 
_column_statistics.read_time);
-            COUNTER_UPDATE(_parquet_profile.file_read_calls, 
_column_statistics.read_calls);
-            COUNTER_UPDATE(_parquet_profile.file_meta_read_calls,
-                           _column_statistics.meta_read_calls);
-            COUNTER_UPDATE(_parquet_profile.file_read_bytes, 
_column_statistics.read_bytes);
-            COUNTER_UPDATE(_parquet_profile.decompress_time, 
_column_statistics.decompress_time);
-            COUNTER_UPDATE(_parquet_profile.decompress_cnt, 
_column_statistics.decompress_cnt);
-            COUNTER_UPDATE(_parquet_profile.decode_header_time,
-                           _column_statistics.decode_header_time);
-            COUNTER_UPDATE(_parquet_profile.decode_value_time,
-                           _column_statistics.decode_value_time);
-            COUNTER_UPDATE(_parquet_profile.decode_dict_time, 
_column_statistics.decode_dict_time);
-            COUNTER_UPDATE(_parquet_profile.decode_level_time,
-                           _column_statistics.decode_level_time);
-            COUNTER_UPDATE(_parquet_profile.decode_null_map_time,
-                           _column_statistics.decode_null_map_time);
-        }
         _closed = true;
     }
 }
@@ -592,6 +555,9 @@ RowGroupReader::PositionDeleteContext 
ParquetReader::_get_position_delete_ctx(
 }
 
 Status ParquetReader::_next_row_group_reader() {
+    if (_current_group_reader != nullptr) {
+        _current_group_reader->collect_profile_before_close();
+    }
     if (_read_row_groups.empty()) {
         _row_group_eof = true;
         _current_group_reader.reset(nullptr);
@@ -934,4 +900,46 @@ int64_t ParquetReader::_get_column_start_offset(const 
tparquet::ColumnMetaData&
     }
     return column.data_page_offset;
 }
+
+void ParquetReader::_collect_profile() {
+    if (_profile == nullptr) {
+        return;
+    }
+
+    if (_current_group_reader != nullptr) {
+        _current_group_reader->collect_profile_before_close();
+    }
+    COUNTER_UPDATE(_parquet_profile.filtered_row_groups, 
_statistics.filtered_row_groups);
+    COUNTER_UPDATE(_parquet_profile.to_read_row_groups, 
_statistics.read_row_groups);
+    COUNTER_UPDATE(_parquet_profile.filtered_group_rows, 
_statistics.filtered_group_rows);
+    COUNTER_UPDATE(_parquet_profile.filtered_page_rows, 
_statistics.filtered_page_rows);
+    COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows, 
_statistics.lazy_read_filtered_rows);
+    COUNTER_UPDATE(_parquet_profile.filtered_bytes, 
_statistics.filtered_bytes);
+    COUNTER_UPDATE(_parquet_profile.raw_rows_read, _statistics.read_rows);
+    COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes);
+    COUNTER_UPDATE(_parquet_profile.column_read_time, 
_statistics.column_read_time);
+    COUNTER_UPDATE(_parquet_profile.parse_meta_time, 
_statistics.parse_meta_time);
+    COUNTER_UPDATE(_parquet_profile.parse_footer_time, 
_statistics.parse_footer_time);
+    COUNTER_UPDATE(_parquet_profile.open_file_time, 
_statistics.open_file_time);
+    COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num);
+    COUNTER_UPDATE(_parquet_profile.page_index_filter_time, 
_statistics.page_index_filter_time);
+    COUNTER_UPDATE(_parquet_profile.row_group_filter_time, 
_statistics.row_group_filter_time);
+
+    COUNTER_UPDATE(_parquet_profile.file_read_time, 
_column_statistics.read_time);
+    COUNTER_UPDATE(_parquet_profile.file_read_calls, 
_column_statistics.read_calls);
+    COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, 
_column_statistics.meta_read_calls);
+    COUNTER_UPDATE(_parquet_profile.file_read_bytes, 
_column_statistics.read_bytes);
+    COUNTER_UPDATE(_parquet_profile.decompress_time, 
_column_statistics.decompress_time);
+    COUNTER_UPDATE(_parquet_profile.decompress_cnt, 
_column_statistics.decompress_cnt);
+    COUNTER_UPDATE(_parquet_profile.decode_header_time, 
_column_statistics.decode_header_time);
+    COUNTER_UPDATE(_parquet_profile.decode_value_time, 
_column_statistics.decode_value_time);
+    COUNTER_UPDATE(_parquet_profile.decode_dict_time, 
_column_statistics.decode_dict_time);
+    COUNTER_UPDATE(_parquet_profile.decode_level_time, 
_column_statistics.decode_level_time);
+    COUNTER_UPDATE(_parquet_profile.decode_null_map_time, 
_column_statistics.decode_null_map_time);
+}
+
+void ParquetReader::_collect_profile_before_close() {
+    _collect_profile();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 679b06fa296..bdc58669ce5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -152,6 +152,9 @@ public:
         _table_col_to_file_col = map;
     }
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     struct ParquetProfile {
         RuntimeProfile::Counter* filtered_row_groups;
@@ -211,7 +214,9 @@ private:
     std::string _meta_cache_key(const std::string& path) { return "meta_" + 
path; }
     std::vector<io::PrefetchRange> _generate_random_access_ranges(
             const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size);
+    void _collect_profile();
 
+private:
     RuntimeProfile* _profile;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
diff --git a/be/src/vec/exec/format/table/table_format_reader.h 
b/be/src/vec/exec/format/table/table_format_reader.h
index 5ce9856ad8a..9426d116334 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -57,6 +57,13 @@ public:
 
     virtual Status init_row_filters(const TFileRangeDesc& range) = 0;
 
+protected:
+    void _collect_profile_before_close() override {
+        if (_file_format_reader != nullptr) {
+            _file_format_reader->collect_profile_before_close();
+        }
+    }
+
 protected:
     std::string _table_format;                          // hudi, iceberg
     std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index 218bd260455..a639249cb35 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -160,32 +160,6 @@ Status JniConnector::close() {
         JNIEnv* env = nullptr;
         RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
         if (_scanner_opened) {
-            // update scanner metrics
-            for (const auto& metric : get_statistics(env)) {
-                std::vector<std::string> type_and_name = split(metric.first, 
":");
-                if (type_and_name.size() != 2) {
-                    LOG(WARNING) << "Name of JNI Scanner metric should be 
pattern like "
-                                 << "'metricType:metricName'";
-                    continue;
-                }
-                long metric_value = std::stol(metric.second);
-                RuntimeProfile::Counter* scanner_counter;
-                if (type_and_name[0] == "timer") {
-                    scanner_counter =
-                            ADD_CHILD_TIMER(_profile, type_and_name[1], 
_connector_name.c_str());
-                } else if (type_and_name[0] == "counter") {
-                    scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::UNIT,
-                                                        
_connector_name.c_str());
-                } else if (type_and_name[0] == "bytes") {
-                    scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::BYTES,
-                                                        
_connector_name.c_str());
-                } else {
-                    LOG(WARNING) << "Type of JNI Scanner metric should be 
timer, counter or bytes";
-                    continue;
-                }
-                COUNTER_UPDATE(scanner_counter, metric_value);
-            }
-
             // _fill_block may be failed and returned, we should release table 
in close.
             // org.apache.doris.common.jni.JniScanner#releaseTable is 
idempotent
             env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
@@ -617,4 +591,40 @@ Status JniConnector::generate_meta_info(Block* block, 
std::unique_ptr<long[]>& m
     memcpy(meta.get(), &meta_data[0], meta_data.size() * 8);
     return Status::OK();
 }
+
+void JniConnector::_collect_profile_before_close() {
+    if (_scanner_opened && _profile != nullptr) {
+        JNIEnv* env = nullptr;
+        Status st = JniUtil::GetJNIEnv(&env);
+        if (!st) {
+            LOG(WARNING) << "failed to get jni env when collect profile: " << 
st;
+            return;
+        }
+        // update scanner metrics
+        for (const auto& metric : get_statistics(env)) {
+            std::vector<std::string> type_and_name = split(metric.first, ":");
+            if (type_and_name.size() != 2) {
+                LOG(WARNING) << "Name of JNI Scanner metric should be pattern 
like "
+                             << "'metricType:metricName'";
+                continue;
+            }
+            long metric_value = std::stol(metric.second);
+            RuntimeProfile::Counter* scanner_counter;
+            if (type_and_name[0] == "timer") {
+                scanner_counter =
+                        ADD_CHILD_TIMER(_profile, type_and_name[1], 
_connector_name.c_str());
+            } else if (type_and_name[0] == "counter") {
+                scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::UNIT,
+                                                    _connector_name.c_str());
+            } else if (type_and_name[0] == "bytes") {
+                scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::BYTES,
+                                                    _connector_name.c_str());
+            } else {
+                LOG(WARNING) << "Type of JNI Scanner metric should be timer, 
counter or bytes";
+                continue;
+            }
+            COUNTER_UPDATE(scanner_counter, metric_value);
+        }
+    }
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index c053c8a1f61..3bb39927f59 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -33,6 +33,7 @@
 #include "runtime/define_primitive_type.h"
 #include "runtime/primitive_type.h"
 #include "runtime/types.h"
+#include "util/profile_collector.h"
 #include "util/runtime_profile.h"
 #include "util/string_util.h"
 #include "vec/aggregate_functions/aggregate_function.h"
@@ -58,7 +59,7 @@ namespace doris::vectorized {
 /**
  * Connector to java jni scanner, which should extend 
org.apache.doris.common.jni.JniScanner
  */
-class JniConnector {
+class JniConnector : public ProfileCollector {
 public:
     /**
      * The predicates that can be pushed down to java side.
@@ -243,6 +244,9 @@ public:
 
     static Status generate_meta_info(Block* block, std::unique_ptr<long[]>& 
meta);
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     std::string _connector_name;
     std::string _connector_class;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index ddecaccd195..b0e0ea6cc23 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -499,14 +499,14 @@ void NewOlapScanner::_update_realtime_counters() {
     _tablet_reader->mutable_stats()->raw_rows_read = 0;
 }
 
-void NewOlapScanner::_update_counters_before_close() {
+void NewOlapScanner::_collect_profile_before_close() {
     //  Please don't directly enable the profile here, we need to set 
QueryStatistics using the counter inside.
     if (_has_updated_counter) {
         return;
     }
     _has_updated_counter = true;
 
-    VScanner::_update_counters_before_close();
+    VScanner::_collect_profile_before_close();
 
     // Update counters for NewOlapScanner
     NewOlapScanNode* olap_parent = (NewOlapScanNode*)_parent;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index 6ecb6a1dc5d..debf9f8bfc7 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -78,7 +78,7 @@ public:
 
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
-    void _update_counters_before_close() override;
+    void _collect_profile_before_close() override;
 
 private:
     void _update_realtime_counters();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 6143fbd2681..34e8acea8dd 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -664,6 +664,7 @@ void VFileScanner::_truncate_char_or_varchar_column(Block* 
block, int idx, int l
 Status VFileScanner::_get_next_reader() {
     while (true) {
         if (_cur_reader) {
+            _cur_reader->collect_profile_before_close();
             _cur_reader->close();
         }
         _cur_reader.reset(nullptr);
@@ -1070,11 +1071,6 @@ Status VFileScanner::close(RuntimeState* state) {
         return Status::OK();
     }
 
-    if (config::enable_file_cache && 
_state->query_options().enable_file_cache) {
-        io::FileCacheProfileReporter cache_profile(_profile);
-        cache_profile.update(_file_cache_statistics.get());
-    }
-
     if (_cur_reader) {
         _cur_reader->close();
     }
@@ -1090,4 +1086,17 @@ void VFileScanner::try_stop() {
     }
 }
 
+void VFileScanner::_collect_profile_before_close() {
+    VScanner::_collect_profile_before_close();
+    if (config::enable_file_cache && _state->query_options().enable_file_cache 
&&
+        _profile != nullptr) {
+        io::FileCacheProfileReporter cache_profile(_profile);
+        cache_profile.update(_file_cache_statistics.get());
+    }
+
+    if (_cur_reader != nullptr) {
+        _cur_reader->collect_profile_before_close();
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 0344b04fc1c..f972337fe29 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -91,6 +91,8 @@ protected:
     // TODO: cast input block columns type to string.
     Status _cast_src_block(Block* block) { return Status::OK(); }
 
+    void _collect_profile_before_close() override;
+
 protected:
     std::unique_ptr<TextConverter> _text_converter;
     const TFileScanRangeParams* _params;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 82e81bb7ae0..2408f7ec709 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -167,7 +167,7 @@ Status VScanner::close(RuntimeState* state) {
     return Status::OK();
 }
 
-void VScanner::_update_counters_before_close() {
+void VScanner::_collect_profile_before_close() {
     if (_parent) {
         COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
         COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 7d4b36ff44d..7d5ba2fb782 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -79,7 +79,7 @@ protected:
     virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* 
eof) = 0;
 
     // Update the counters before closing this scanner
-    virtual void _update_counters_before_close();
+    virtual void _collect_profile_before_close();
 
     // Filter the output block finally.
     Status _filter_output_block(Block* block);
@@ -137,7 +137,7 @@ public:
         // update counters. For example, update counters depend on scanner's 
tablet, but
         // the tablet == null when init failed.
         if (_is_open) {
-            _update_counters_before_close();
+            _collect_profile_before_close();
         }
         _need_to_close = true;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to