This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b177b26d3988efc33b76efa0d76fadc57aa52960
Author: zclllyybb <[email protected]>
AuthorDate: Wed Feb 28 22:35:41 2024 +0800

    [branch-2.1](tracing) Pick pipeline tracing and relative bugfix (#31367)
    
    * [Feature](pipeline) Trace pipeline scheduling (part I) (#31027)
    
    * [fix](compile) Fix performance compile fail #31305
    
    * [fix](compile) Fix macOS compilation issues for PURE macro and CPU core 
identification (#31357)
    
    * [fix](compile) Correct PURE macro definition to fix compilation on macOS
    
    * 2
    
    ---------
    
    Co-authored-by: zy-kkk <[email protected]>
---
 be/src/common/compiler_util.h                      |   2 +
 be/src/common/config.cpp                           |   1 +
 be/src/common/config.h                             |   1 +
 be/src/exec/data_sink.cpp                          |  19 ---
 be/src/exec/data_sink.h                            |   3 -
 .../action/adjust_tracing_dump.cpp}                |  47 ++----
 .../action/adjust_tracing_dump.h}                  |  32 +---
 be/src/io/fs/file_system.h                         |  13 +-
 be/src/io/fs/local_file_system.h                   |  13 +-
 be/src/io/fs/local_file_writer.h                   |   9 +-
 be/src/pipeline/pipeline.cpp                       |  13 +-
 be/src/pipeline/pipeline.h                         |  18 ++-
 be/src/pipeline/pipeline_fragment_context.cpp      |  32 +---
 be/src/pipeline/pipeline_fragment_context.h        |  11 +-
 be/src/pipeline/pipeline_task.h                    |  11 +-
 be/src/pipeline/pipeline_tracing.cpp               | 162 +++++++++++++++++++++
 be/src/pipeline/pipeline_tracing.h                 |  83 +++++++++++
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  63 ++++----
 .../pipeline_x/pipeline_x_fragment_context.h       |   2 -
 be/src/pipeline/task_scheduler.cpp                 |  84 +++++++----
 be/src/pipeline/task_scheduler.h                   |   3 -
 be/src/runtime/exec_env.h                          |  25 ++--
 be/src/runtime/exec_env_init.cpp                   |   2 +
 be/src/runtime/fragment_mgr.cpp                    |   2 +-
 be/src/runtime/query_context.cpp                   |  24 ++-
 be/src/runtime/query_context.h                     |   7 +-
 be/src/service/http_service.cpp                    |  14 +-
 be/src/util/thrift_util.cpp                        |  34 ++++-
 be/src/util/thrift_util.h                          |  20 +--
 build.sh                                           |   1 +
 30 files changed, 501 insertions(+), 250 deletions(-)

diff --git a/be/src/common/compiler_util.h b/be/src/common/compiler_util.h
index 4b684659ed9..79588cade91 100644
--- a/be/src/common/compiler_util.h
+++ b/be/src/common/compiler_util.h
@@ -49,3 +49,5 @@
 #define MAY_ALIAS __attribute__((__may_alias__))
 
 #define ALIGN_CACHE_LINE __attribute__((aligned(CACHE_LINE_SIZE)))
+
+#define PURE __attribute__((pure))
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 46c98bb763b..b49b95e8e5d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -199,6 +199,7 @@ DEFINE_mInt32(download_low_speed_time, "300");
 // log dir
 DEFINE_String(sys_log_dir, "${DORIS_HOME}/log");
 DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
+DEFINE_String(pipeline_tracing_log_dir, "${DORIS_HOME}/log/tracing");
 // INFO, WARNING, ERROR, FATAL
 DEFINE_mString(sys_log_level, "INFO");
 // TIME-DAY, TIME-HOUR, SIZE-MB-nnn
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8878cba0e64..c82d42a9ef4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -247,6 +247,7 @@ DECLARE_mInt32(download_low_speed_time);
 // log dir
 DECLARE_String(sys_log_dir);
 DECLARE_String(user_function_dir);
+DECLARE_String(pipeline_tracing_log_dir);
 // INFO, WARNING, ERROR, FATAL
 DECLARE_String(sys_log_level);
 // TIME-DAY, TIME-HOUR, SIZE-MB-nnn
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 5809912f4a2..5047ae8fc78 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -346,23 +346,4 @@ Status DataSink::init(const TDataSink& thrift_sink) {
 Status DataSink::prepare(RuntimeState* state) {
     return Status::OK();
 }
-
-bool DataSink::_has_inverted_index_or_partial_update(TOlapTableSink sink) {
-    OlapTableSchemaParam schema;
-    if (!schema.init(sink.schema).ok()) {
-        return false;
-    }
-    if (schema.is_partial_update()) {
-        return true;
-    }
-    for (const auto& index_schema : schema.indexes()) {
-        for (const auto& index : index_schema->indexes) {
-            if (index->index_type() == INVERTED) {
-                return true;
-            }
-        }
-    }
-    return false;
-}
-
 } // namespace doris
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index e08d40ab023..5258929ba79 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -106,9 +106,6 @@ public:
 
     std::shared_ptr<QueryStatistics> get_query_statistics_ptr();
 
-private:
-    static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
-
 protected:
     // Set to true after close() has been called. subclasses should check and 
set this in
     // close().
diff --git a/be/src/io/fs/local_file_writer.h 
b/be/src/http/action/adjust_tracing_dump.cpp
similarity index 52%
copy from be/src/io/fs/local_file_writer.h
copy to be/src/http/action/adjust_tracing_dump.cpp
index b138879e358..55d1526d82b 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/http/action/adjust_tracing_dump.cpp
@@ -15,38 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+#include "adjust_tracing_dump.h"
 
-#include <cstddef>
-
-#include "common/status.h"
-#include "io/fs/file_system.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
+#include "common/logging.h"
+#include "http/http_channel.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "runtime/exec_env.h"
 
 namespace doris {
-namespace io {
-
-class LocalFileWriter final : public FileWriter {
-public:
-    LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data = 
true);
-    LocalFileWriter(Path path, int fd);
-    ~LocalFileWriter() override;
-
-    Status close() override;
-    Status appendv(const Slice* data, size_t data_cnt) override;
-    Status finalize() override;
-
-private:
-    void _abort();
-    Status _close(bool sync);
-
-private:
-    int _fd; // owned
-    bool _dirty = false;
-    const bool _sync_data;
-};
-
-} // namespace io
+void AdjustTracingDump::handle(HttpRequest* req) {
+    auto* ctx = ExecEnv::GetInstance()->pipeline_tracer_context();
+    auto* params = req->params();
+    if (auto status = ctx->change_record_params(*params); status.ok()) {
+        HttpChannel::send_reply(req, "change record type succeed!\n"); // ok
+    } else {                                                           // not 
ok
+        LOG(WARNING) << "adjust pipeline tracing dump method failed:" << 
status.msg() << '\n';
+        HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, 
status.msg().data());
+    }
+}
 } // namespace doris
diff --git a/be/src/io/fs/local_file_writer.h 
b/be/src/http/action/adjust_tracing_dump.h
similarity index 56%
copy from be/src/io/fs/local_file_writer.h
copy to be/src/http/action/adjust_tracing_dump.h
index b138879e358..d82d8b1e55b 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/http/action/adjust_tracing_dump.h
@@ -17,36 +17,18 @@
 
 #pragma once
 
-#include <cstddef>
-
-#include "common/status.h"
-#include "io/fs/file_system.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
+#include "http/http_handler.h"
 
 namespace doris {
-namespace io {
 
-class LocalFileWriter final : public FileWriter {
-public:
-    LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data = 
true);
-    LocalFileWriter(Path path, int fd);
-    ~LocalFileWriter() override;
+class HttpRequest;
 
-    Status close() override;
-    Status appendv(const Slice* data, size_t data_cnt) override;
-    Status finalize() override;
+class AdjustTracingDump : public HttpHandler {
+public:
+    AdjustTracingDump() = default;
 
-private:
-    void _abort();
-    Status _close(bool sync);
+    ~AdjustTracingDump() override = default;
 
-private:
-    int _fd; // owned
-    bool _dirty = false;
-    const bool _sync_data;
+    void handle(HttpRequest* req) override;
 };
-
-} // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index 82d3911ae9d..a8cdf5f4eb6 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -19,8 +19,8 @@
 
 #include <butil/macros.h>
 #include <glog/logging.h>
-#include <stdint.h>
 
+#include <cstdint>
 #include <memory>
 #include <string>
 #include <vector>
@@ -29,8 +29,7 @@
 #include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/path.h"
 
-namespace doris {
-namespace io {
+namespace doris::io {
 
 #ifndef FILESYSTEM_M
 #define FILESYSTEM_M(stmt)                                  \
@@ -83,7 +82,6 @@ public:
 
     std::shared_ptr<FileSystem> getSPtr() { return shared_from_this(); }
 
-public:
     // the root path of this fs.
     // if not empty, all given Path will be "_root_path/path"
     const Path& root_path() const { return _root_path; }
@@ -97,7 +95,8 @@ public:
     virtual ~FileSystem() = default;
 
     // Each derived class should implement create method to create fs.
-    DISALLOW_COPY_AND_ASSIGN(FileSystem);
+    FileSystem(const FileSystem&) = delete;
+    const FileSystem& operator=(const FileSystem&) = delete;
 
 protected:
     /// create file and return a FileWriter
@@ -152,7 +151,6 @@ protected:
         return _root_path / path;
     }
 
-protected:
     FileSystem(Path&& root_path, std::string&& id, FileSystemType type)
             : _root_path(std::move(root_path)), _id(std::move(id)), 
_type(type) {}
 
@@ -163,5 +161,4 @@ protected:
 
 using FileSystemSPtr = std::shared_ptr<FileSystem>;
 
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 705f0ed36e9..8578b9f5ac2 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -17,9 +17,8 @@
 
 #pragma once
 
-#include <stdint.h>
-#include <time.h>
-
+#include <cstdint>
+#include <ctime>
 #include <functional>
 #include <memory>
 #include <string>
@@ -29,8 +28,7 @@
 #include "io/fs/file_system.h"
 #include "io/fs/path.h"
 
-namespace doris {
-namespace io {
+namespace doris::io {
 
 class LocalFileSystem final : public FileSystem {
 public:
@@ -106,7 +104,6 @@ private:
     LocalFileSystem(Path&& root_path, std::string&& id = "");
 };
 
-const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
+PURE const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
 
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h
index b138879e358..8c8d436d54e 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/io/fs/local_file_writer.h
@@ -25,8 +25,7 @@
 #include "io/fs/path.h"
 #include "util/slice.h"
 
-namespace doris {
-namespace io {
+namespace doris::io {
 
 class LocalFileWriter final : public FileWriter {
 public:
@@ -42,11 +41,9 @@ private:
     void _abort();
     Status _close(bool sync);
 
-private:
     int _fd; // owned
     bool _dirty = false;
-    const bool _sync_data;
+    const bool _sync_data = false;
 };
 
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 9a30cee5ab9..9c9d7cd6099 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -17,7 +17,8 @@
 
 #include "pipeline.h"
 
-#include <ostream>
+#include <memory>
+#include <string>
 #include <utility>
 
 #include "pipeline/exec/operator.h"
@@ -26,10 +27,13 @@ namespace doris::pipeline {
 
 void Pipeline::_init_profile() {
     auto s = fmt::format("Pipeline (pipeline id={})", _pipeline_id);
-    _pipeline_profile.reset(new RuntimeProfile(std::move(s)));
+    _pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s));
 }
 
 Status Pipeline::build_operators() {
+    _name.reserve(_operator_builders.size() * 10);
+    _name.append(std::to_string(id()));
+
     OperatorPtr pre;
     for (auto& operator_t : _operator_builders) {
         auto o = operator_t->build_operator();
@@ -37,6 +41,11 @@ Status Pipeline::build_operators() {
             static_cast<void>(o->set_child(pre));
         }
         _operators.emplace_back(o);
+
+        _name.push_back('-');
+        _name.append(std::to_string(operator_t->id()));
+        _name.append(o->get_name());
+
         pre = std::move(o);
     }
     return Status::OK();
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 148191f4e2d..ab5b7e36bc2 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -18,11 +18,11 @@
 #pragma once
 
 #include <glog/logging.h>
-#include <stdint.h>
 
-#include <algorithm>
-#include <atomic>
+#include <cstdint>
 #include <memory>
+#include <string_view>
+#include <utility>
 #include <vector>
 
 #include "common/status.h"
@@ -42,18 +42,19 @@ using PipelineId = uint32_t;
 class Pipeline : public std::enable_shared_from_this<Pipeline> {
     friend class PipelineTask;
     friend class PipelineXTask;
+    friend class PipelineXFragmentContext;
 
 public:
     Pipeline() = delete;
     explicit Pipeline(PipelineId pipeline_id, int num_tasks,
                       std::weak_ptr<PipelineFragmentContext> context)
-            : _pipeline_id(pipeline_id), _context(context), 
_num_tasks(num_tasks) {
+            : _pipeline_id(pipeline_id), _context(std::move(context)), 
_num_tasks(num_tasks) {
         _init_profile();
     }
 
     void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
-        pipeline->_parents.push_back({_operator_builders.size(), 
weak_from_this()});
-        _dependencies.push_back({_operator_builders.size(), pipeline});
+        pipeline->_parents.emplace_back(_operator_builders.size(), 
weak_from_this());
+        _dependencies.emplace_back(_operator_builders.size(), pipeline);
     }
 
     // If all dependencies are finished, this pipeline task should be 
scheduled.
@@ -192,6 +193,11 @@ private:
     std::weak_ptr<PipelineFragmentContext> _context;
     int _previous_schedule_id = -1;
 
+    // pipline id + operator names. init when:
+    //  build_operators(), if pipeline;
+    //  _build_pipelines() and _create_tree_helper(), if pipelineX.
+    std::string _name;
+
     std::unique_ptr<RuntimeProfile> _pipeline_profile;
 
     // Operators for pipelineX. All pipeline tasks share operators from this.
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 358086e94eb..239173fa781 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -117,10 +117,10 @@ namespace doris::pipeline {
 bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count");
 
 PipelineFragmentContext::PipelineFragmentContext(
-        const TUniqueId& query_id, const TUniqueId& instance_id, const int 
fragment_id,
-        int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* 
exec_env,
+        const TUniqueId& query_id, const TUniqueId& instance_id, int 
fragment_id, int backend_num,
+        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
         const std::function<void(RuntimeState*, Status*)>& call_back,
-        const report_status_callback& report_status_cb)
+        report_status_callback report_status_cb)
         : _query_id(query_id),
           _fragment_instance_id(instance_id),
           _fragment_id(fragment_id),
@@ -129,7 +129,7 @@ PipelineFragmentContext::PipelineFragmentContext(
           _query_ctx(std::move(query_ctx)),
           _call_back(call_back),
           _is_report_on_cancel(true),
-          _report_status_cb(report_status_cb),
+          _report_status_cb(std::move(report_status_cb)),
           _create_time(MonotonicNanos()) {
     _fragment_watcher.start();
 }
@@ -951,31 +951,13 @@ Status PipelineFragmentContext::send_report(bool done) {
              _fragment_instance_id,
              _backend_num,
              _runtime_state.get(),
-             [this](auto&& PH1) { return 
update_status(std::forward<decltype(PH1)>(PH1)); },
-             [this](auto&& PH1, auto&& PH2) {
-                 cancel(std::forward<decltype(PH1)>(PH1), 
std::forward<decltype(PH2)>(PH2));
+             [this](Status st) { return update_status(st); },
+             [this](const PPlanFragmentCancelReason& reason, const 
std::string& msg) {
+                 cancel(reason, msg);
              }},
             
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
 
-bool 
PipelineFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink 
sink) {
-    OlapTableSchemaParam schema;
-    if (!schema.init(sink.schema).ok()) {
-        return false;
-    }
-    if (schema.is_partial_update()) {
-        return true;
-    }
-    for (const auto& index_schema : schema.indexes()) {
-        for (const auto& index : index_schema->indexes) {
-            if (index->index_type() == INVERTED) {
-                return true;
-            }
-        }
-    }
-    return false;
-}
-
 std::string PipelineFragmentContext::debug_string() {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info: QueryId 
= {}\n",
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 4c805b50582..9925689cb2c 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -19,13 +19,11 @@
 
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/types.pb.h>
-#include <stddef.h>
-#include <stdint.h>
 
 #include <atomic>
-#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
 #include <functional>
-#include <future>
 #include <memory>
 #include <mutex>
 #include <string>
@@ -62,10 +60,10 @@ public:
     using report_status_callback = std::function<Status(
             const ReportStatusRequest, 
std::shared_ptr<pipeline::PipelineFragmentContext>&&)>;
     PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& 
instance_id,
-                            const int fragment_id, int backend_num,
+                            int fragment_id, int backend_num,
                             std::shared_ptr<QueryContext> query_ctx, ExecEnv* 
exec_env,
                             const std::function<void(RuntimeState*, Status*)>& 
call_back,
-                            const report_status_callback& report_status_cb);
+                            report_status_callback report_status_cb);
 
     ~PipelineFragmentContext() override;
 
@@ -210,7 +208,6 @@ protected:
     int _num_instances = 1;
 
 private:
-    static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
     std::vector<std::unique_ptr<PipelineTask>> _tasks;
 
     uint64_t _create_time;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index b54a6de593d..517d6b8a8de 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -17,11 +17,10 @@
 
 #pragma once
 
-#include <stdint.h>
-
+#include <cstdint>
 #include <memory>
 #include <string>
-#include <vector>
+#include <string_view>
 
 #include "common/config.h"
 #include "common/status.h"
@@ -266,7 +265,7 @@ public:
         }
         // If enable_debug_log_timeout_secs <= 0, then disable the log
         if (_pipeline_task_watcher.elapsed_time() >
-            config::enable_debug_log_timeout_secs * 1000l * 1000l * 1000l) {
+            config::enable_debug_log_timeout_secs * 1000L * 1000L * 1000L) {
             _has_exceed_timeout = true;
             return true;
         }
@@ -286,7 +285,9 @@ public:
         }
     }
 
-    RuntimeState* runtime_state() { return _state; }
+    RuntimeState* runtime_state() const { return _state; }
+
+    std::string task_name() const { return fmt::format("task{}({})", _index, 
_pipeline->_name); }
 
 protected:
     void _finish_p_dependency() {
diff --git a/be/src/pipeline/pipeline_tracing.cpp 
b/be/src/pipeline/pipeline_tracing.cpp
new file mode 100644
index 00000000000..94675f77f63
--- /dev/null
+++ b/be/src/pipeline/pipeline_tracing.cpp
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline_tracing.h"
+
+#include <absl/time/clock.h>
+#include <fcntl.h>
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <chrono>
+#include <cstdint>
+#include <mutex>
+#include <string>
+
+#include "common/config.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "io/fs/local_file_writer.h"
+#include "util/time.h"
+
+namespace doris::pipeline {
+
+void PipelineTracerContext::record(ScheduleRecord record) {
+    if (_dump_type == RecordType::None) [[unlikely]] {
+        return;
+    }
+    if (_datas.contains(record.query_id)) {
+        _datas[record.query_id].enqueue(record);
+    } else {
+        std::unique_lock<std::mutex> l(_data_lock); // add new item, may rehash
+        _datas[record.query_id].enqueue(record);
+    }
+}
+
+void PipelineTracerContext::end_query(TUniqueId query_id, uint64_t task_group) 
{
+    {
+        std::unique_lock<std::mutex> l(_tg_lock);
+        _id_to_taskgroup[query_id] = task_group;
+    }
+    if (_dump_type == RecordType::PerQuery) {
+        _dump(query_id);
+    } else if (_dump_type == RecordType::Periodic) {
+        auto now = MonotonicSeconds();
+        auto interval = now - _last_dump_time;
+        if (interval > _dump_interval_s) {
+            _dump(query_id);
+        }
+    }
+}
+
+Status PipelineTracerContext::change_record_params(
+        const std::map<std::string, std::string>& params) {
+    bool effective = false;
+    if (auto it = params.find("type"); it != params.end()) {
+        if (boost::iequals(it->second, "disable") || 
boost::iequals(it->second, "none")) {
+            _dump_type = RecordType::None;
+            effective = true;
+        } else if (boost::iequals(it->second, "per_query") ||
+                   boost::iequals(it->second, "perquery")) {
+            _dump_type = RecordType::PerQuery;
+            effective = true;
+        } else if (boost::iequals(it->second, "periodic")) {
+            _dump_type = RecordType::Periodic;
+            _last_dump_time = MonotonicSeconds();
+            effective = true;
+        }
+    }
+
+    if (auto it = params.find("dump_interval"); it != params.end()) {
+        _dump_interval_s = std::stoll(it->second); // s as unit
+        effective = true;
+    }
+
+    return effective ? Status::OK()
+                     : Status::InvalidArgument(
+                               "No qualified param in changing tracing record 
method");
+}
+
+void PipelineTracerContext::_dump(TUniqueId query_id) {
+    if (_dump_type == RecordType::None) {
+        return;
+    }
+
+    //TODO: when dump, now could append records but can't add new query. try 
use better grained locks.
+    std::unique_lock<std::mutex> l(_data_lock); // can't rehash
+    if (_dump_type == RecordType::PerQuery) {
+        auto path = _dir / fmt::format("query{}", to_string(query_id));
+        int fd = ::open(
+                path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
+                S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | 
S_IWOTH | S_IROTH);
+        if (fd < 0) [[unlikely]] {
+            throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
+                    "create tracing log file {} failed", path.c_str()));
+        }
+        auto writer = io::LocalFileWriter {path, fd};
+
+        ScheduleRecord record;
+        while (_datas[query_id].try_dequeue(record)) {
+            uint64_t v = 0;
+            {
+                std::unique_lock<std::mutex> l(_tg_lock);
+                v = _id_to_taskgroup[query_id];
+            }
+            auto tmp_str = record.to_string(v);
+            auto text = Slice {tmp_str};
+            THROW_IF_ERROR(writer.appendv(&text, 1));
+        }
+
+        THROW_IF_ERROR(writer.finalize());
+        THROW_IF_ERROR(writer.close());
+    } else if (_dump_type == RecordType::Periodic) {
+        auto path = _dir / fmt::format("until{}",
+                                       
std::chrono::steady_clock::now().time_since_epoch().count());
+        int fd = ::open(
+                path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
+                S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | 
S_IWOTH | S_IROTH);
+        if (fd < 0) [[unlikely]] {
+            throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
+                    "create tracing log file {} failed", path.c_str()));
+        }
+        auto writer = io::LocalFileWriter {path, fd};
+
+        for (auto& [id, trace] : _datas) {
+            ScheduleRecord record;
+            while (trace.try_dequeue(record)) {
+                uint64_t v = 0;
+                {
+                    std::unique_lock<std::mutex> l(_tg_lock);
+                    v = _id_to_taskgroup[query_id];
+                }
+                auto tmp_str = record.to_string(v);
+                auto text = Slice {tmp_str};
+                THROW_IF_ERROR(writer.appendv(&text, 1));
+            }
+        }
+        THROW_IF_ERROR(writer.finalize());
+        THROW_IF_ERROR(writer.close());
+
+        _last_dump_time = MonotonicSeconds();
+    }
+
+    _datas.erase(query_id);
+    {
+        std::unique_lock<std::mutex> l(_tg_lock);
+        _id_to_taskgroup.erase(query_id);
+    }
+}
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_tracing.h 
b/be/src/pipeline/pipeline_tracing.h
new file mode 100644
index 00000000000..3160148c570
--- /dev/null
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <concurrentqueue.h>
+#include <fmt/core.h>
+#include <gen_cpp/Types_types.h>
+#include <parallel_hashmap/phmap.h>
+
+#include <cstdint>
+#include <filesystem>
+
+#include "common/config.h"
+#include "util/hash_util.hpp" // IWYU pragma: keep
+#include "util/thrift_util.h"
+#include "util/time.h"
+
+namespace doris::pipeline {
+
+struct ScheduleRecord {
+    TUniqueId query_id;
+    std::string task_id;
+    uint32_t core_id;
+    uint64_t thread_id;
+    uint64_t start_time;
+    uint64_t end_time;
+    std::string state_name;
+
+    bool operator<(const ScheduleRecord& rhs) const { return start_time < 
rhs.start_time; }
+    std::string to_string(uint64_t append_value) const {
+        return fmt::format("{}|{}|{}|{}|{}|{}|{}|{}\n", 
doris::to_string(query_id), task_id,
+                           core_id, thread_id, start_time, end_time, 
state_name, append_value);
+    }
+};
+
+// all tracing datas of ONE specific query
+using OneQueryTraces = moodycamel::ConcurrentQueue<ScheduleRecord>;
+
+// belongs to exec_env, for all query, if enable
+class PipelineTracerContext {
+public:
+    enum class RecordType {
+        None,     // disable
+        PerQuery, // record per query. one query one file.
+        Periodic  // record per times. one timeslice one file.
+    };
+    void record(ScheduleRecord record); // record one schedule record
+    void end_query(TUniqueId query_id,
+                   uint64_t task_group); // tell context this query is end. 
may leads to dump.
+    Status change_record_params(const std::map<std::string, std::string>& 
params);
+
+    bool enabled() const { return !(_dump_type == RecordType::None); }
+
+private:
+    void _dump(TUniqueId query_id); // dump data to disk. one query or all.
+
+    std::mutex _data_lock; // lock for map, not map items.
+    phmap::flat_hash_map<TUniqueId, OneQueryTraces> _datas;
+    std::mutex _tg_lock; //TODO: use an lockfree DS
+    phmap::flat_hash_map<TUniqueId, uint64_t> _id_to_taskgroup;
+
+    RecordType _dump_type = RecordType::None;
+    std::filesystem::path _dir = config::pipeline_tracing_log_dir;
+    decltype(MonotonicSeconds()) _last_dump_time;
+    decltype(MonotonicSeconds()) _dump_interval_s =
+            60; // effective iff Periodic mode. 1 minute default.
+};
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 696fcfefba5..f13cf37b1fb 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -23,12 +23,13 @@
 #include <gen_cpp/Planner_types.h>
 #include <pthread.h>
 #include <runtime/result_buffer_mgr.h>
-#include <stdlib.h>
+
+#include <cstdlib>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <map>
+#include <memory>
 #include <ostream>
-#include <typeinfo>
 #include <utility>
 
 #include "common/config.h"
@@ -42,7 +43,6 @@
 #include "pipeline/exec/analytic_sink_operator.h"
 #include "pipeline/exec/analytic_source_operator.h"
 #include "pipeline/exec/assert_num_rows_operator.h"
-#include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/datagen_operator.h"
 #include "pipeline/exec/distinct_streaming_aggregation_operator.h"
 #include "pipeline/exec/empty_set_operator.h"
@@ -67,7 +67,6 @@
 #include "pipeline/exec/repeat_operator.h"
 #include "pipeline/exec/result_file_sink_operator.h"
 #include "pipeline/exec/result_sink_operator.h"
-#include "pipeline/exec/scan_operator.h"
 #include "pipeline/exec/schema_scan_operator.h"
 #include "pipeline/exec/select_operator.h"
 #include "pipeline/exec/set_probe_sink_operator.h"
@@ -93,7 +92,6 @@
 #include "util/container_util.hpp"
 #include "util/debug_util.h"
 #include "util/uid_util.h"
-#include "vec/common/assert_cast.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
 namespace doris::pipeline {
@@ -162,7 +160,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     }
     _num_instances = request.local_params.size();
     _total_instances = request.__isset.total_instances ? 
request.total_instances : _num_instances;
-    _runtime_profile.reset(new RuntimeProfile("PipelineContext"));
+    _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
     _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
     SCOPED_TIMER(_prepare_timer);
 
@@ -378,7 +376,7 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
 
         // TODO: figure out good buffer size based on size of output row
         // Result file sink is not the top sink
-        if (params.__isset.destinations && params.destinations.size() > 0) {
+        if (params.__isset.destinations && !params.destinations.empty()) {
             _sink.reset(new ResultFileSinkOperatorX(next_sink_operator_id(), 
row_desc,
                                                     
thrift_sink.result_file_sink,
                                                     params.destinations, 
output_exprs, desc_tbl));
@@ -408,7 +406,7 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
             auto new_pipeline = add_pipeline();
             RowDescriptor* _row_desc = nullptr;
             {
-                auto& tmp_row_desc =
+                const auto& tmp_row_desc =
                         
!thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
                                 ? RowDescriptor(state->desc_tbl(),
                                                 
{thrift_sink.multi_cast_stream_sink.sinks[i]
@@ -602,14 +600,14 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             return Status::OK();
         };
 
-        for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
-            if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
-                auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
+        for (auto& _pipeline : _pipelines) {
+            if (pipeline_id_to_task.contains(_pipeline->id())) {
+                auto* task = pipeline_id_to_task[_pipeline->id()];
                 DCHECK(task != nullptr);
 
                 // if this task has upstream dependency, then record them.
-                if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) {
-                    auto& deps = _dag[_pipelines[pip_idx]->id()];
+                if (_dag.find(_pipeline->id()) != _dag.end()) {
+                    auto& deps = _dag[_pipeline->id()];
                     for (auto& dep : deps) {
                         if (pipeline_id_to_task.contains(dep)) {
                             task->add_upstream_dependency(
@@ -642,11 +640,14 @@ Status 
PipelineXFragmentContext::_build_pipelines(ObjectPool* pool,
                                                   const 
doris::TPipelineFragmentParams& request,
                                                   const DescriptorTbl& descs, 
OperatorXPtr* root,
                                                   PipelinePtr cur_pipe) {
-    if (request.fragment.plan.nodes.size() == 0) {
+    if (request.fragment.plan.nodes.empty()) {
         throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no 
plan node!");
     }
 
     int node_idx = 0;
+
+    cur_pipe->_name.append(std::to_string(cur_pipe->id()));
+
     RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes, 
request, descs, nullptr,
                                         &node_idx, root, cur_pipe, 0));
 
@@ -688,6 +689,10 @@ Status 
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
         *root = op;
     }
 
+    cur_pipe->_name.push_back('-');
+    cur_pipe->_name.append(std::to_string(op->id()));
+    cur_pipe->_name.append(op->get_name());
+
     // rely on that tnodes is preorder of the plan
     for (int i = 0; i < num_children; i++) {
         ++*node_idx;
@@ -875,6 +880,8 @@ Status PipelineXFragmentContext::_add_local_exchange(
     return Status::OK();
 }
 
+// NOLINTBEGIN(readability-function-size)
+// NOLINTBEGIN(readability-function-cognitive-complexity)
 Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const 
TPlanNode& tnode,
                                                   const 
doris::TPipelineFragmentParams& request,
                                                   const DescriptorTbl& descs, 
OperatorXPtr& op,
@@ -1150,6 +1157,9 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
 
     return Status::OK();
 }
+// NOLINTEND(readability-function-cognitive-complexity)
+// NOLINTEND(readability-function-size)
+
 template <bool is_intersect>
 Status PipelineXFragmentContext::_build_operators_for_set_operation_node(
         ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, 
OperatorXPtr& op,
@@ -1295,30 +1305,13 @@ Status PipelineXFragmentContext::send_report(bool done) 
{
             {true, exec_status, runtime_states, nullptr, 
_runtime_state->load_channel_profile(),
              done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, 
_fragment_id,
              TUniqueId(), _backend_num, _runtime_state.get(),
-             std::bind(&PipelineFragmentContext::update_status, this, 
std::placeholders::_1),
-             std::bind(&PipelineFragmentContext::cancel, this, 
std::placeholders::_1,
-                       std::placeholders::_2)},
+             [this](Status st) { return update_status(st); },
+             [this](const PPlanFragmentCancelReason& reason, const 
std::string& msg) {
+                 cancel(reason, msg);
+             }},
             
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
 }
 
-bool 
PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink 
sink) {
-    OlapTableSchemaParam schema;
-    if (!schema.init(sink.schema).ok()) {
-        return false;
-    }
-    if (schema.is_partial_update()) {
-        return true;
-    }
-    for (const auto& index_schema : schema.indexes()) {
-        for (const auto& index : index_schema->indexes) {
-            if (index->index_type() == INVERTED) {
-                return true;
-            }
-        }
-    }
-    return false;
-}
-
 std::string PipelineXFragmentContext::debug_string() {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "PipelineXFragmentContext Info:\n");
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 102dd854998..9630484f443 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -169,8 +169,6 @@ private:
                                 const std::map<int, int>& 
shuffle_idx_to_instance_idx,
                                 const bool ignore_data_distribution);
 
-    bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
-
     bool _enable_local_shuffle() const { return 
_runtime_state->enable_local_shuffle(); }
 
     OperatorXPtr _root_op = nullptr;
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 98678685d3f..5032bdef6b7 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -23,33 +23,33 @@
 #include <glog/logging.h>
 #include <sched.h>
 
-#include <algorithm>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <functional>
 #include <ostream>
 #include <string>
 #include <thread>
+#include <utility>
 
 #include "common/logging.h"
-#include "common/signal_handler.h"
 #include "pipeline/pipeline_task.h"
-#include "pipeline/pipeline_x/dependency.h"
 #include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "pipeline/task_queue.h"
 #include "pipeline_fragment_context.h"
+#include "runtime/exec_env.h"
 #include "runtime/query_context.h"
 #include "util/debug_util.h"
 #include "util/sse_util.hpp"
 #include "util/thread.h"
 #include "util/threadpool.h"
+#include "util/time.h"
 #include "util/uid_util.h"
 #include "vec/runtime/vdatetime_value.h"
 
 namespace doris::pipeline {
 
 BlockedTaskScheduler::BlockedTaskScheduler(std::string name)
-        : _name(name), _started(false), _shutdown(false) {}
+        : _name(std::move(name)), _started(false), _shutdown(false) {}
 
 Status BlockedTaskScheduler::start() {
     LOG(INFO) << "BlockedTaskScheduler start";
@@ -192,7 +192,7 @@ void BlockedTaskScheduler::_schedule() {
 void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& 
local_tasks,
                                           std::list<PipelineTask*>::iterator& 
task_itr,
                                           PipelineTaskState t_state) {
-    auto task = *task_itr;
+    auto* task = *task_itr;
     task->set_state(t_state);
     local_tasks.erase(task_itr++);
     static_cast<void>(task->get_task_queue()->push_back(task));
@@ -215,8 +215,7 @@ Status TaskScheduler::start() {
     _markers.reserve(cores);
     for (size_t i = 0; i < cores; ++i) {
         _markers.push_back(std::make_unique<std::atomic<bool>>(true));
-        RETURN_IF_ERROR(
-                
_fix_thread_pool->submit_func(std::bind(&TaskScheduler::_do_work, this, i)));
+        RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); 
}));
     }
     return Status::OK();
 }
@@ -226,6 +225,29 @@ Status TaskScheduler::schedule_task(PipelineTask* task) {
     // TODO control num of task
 }
 
+// after _close_task, task maybe destructed.
+void _close_task(PipelineTask* task, PipelineTaskState state, Status 
exec_status) {
+    // close_a_pipeline may delete fragment context and will core in some defer
+    // code, because the defer code will access fragment context it self.
+    auto lock_for_context = task->fragment_context()->shared_from_this();
+    // is_pending_finish does not check status, so has to check status in 
close API.
+    // For example, in async writer, the writer may failed during dealing with 
eos_block
+    // but it does not return error status. Has to check the error status in 
close API.
+    // We have already refactor all source and sink api, the close API does 
not need waiting
+    // for pending finish now. So that could call close directly.
+    Status status = task->close(exec_status);
+    if (!status.ok() && state != PipelineTaskState::CANCELED) {
+        task->query_context()->cancel(true, status.to_string(),
+                                      Status::Cancelled(status.to_string()));
+        state = PipelineTaskState::CANCELED;
+    }
+    task->set_state(state);
+    task->set_close_pipeline_time();
+    task->finalize();
+    task->set_running(false);
+    task->fragment_context()->close_a_pipeline();
+}
+
 void TaskScheduler::_do_work(size_t index) {
     const auto& marker = _markers[index];
     while (*marker) {
@@ -286,7 +308,31 @@ void TaskScheduler::_do_work(size_t index) {
         auto status = Status::OK();
 
         try {
-            status = task->execute(&eos);
+            //TODO: use a better enclose to abstracting these
+            if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
+                TUniqueId query_id = task->query_context()->query_id();
+                std::string task_name = task->task_name();
+#ifdef __APPLE__
+                uint32_t core_id = 0;
+#else
+                uint32_t core_id = sched_getcpu();
+#endif
+                std::thread::id tid = std::this_thread::get_id();
+                uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
+                uint64_t start_time = MonotonicMicros();
+
+                status = task->execute(&eos);
+
+                uint64_t end_time = MonotonicMicros();
+                auto state = task->get_state();
+                std::string state_name =
+                        state == PipelineTaskState::RUNNABLE ? 
get_state_name(state) : "";
+                ExecEnv::GetInstance()->pipeline_tracer_context()->record(
+                        {query_id, task_name, core_id, thread_id, start_time, 
end_time,
+                         state_name});
+            } else {
+                status = task->execute(&eos);
+            }
         } catch (const Exception& e) {
             status = e.to_status();
         }
@@ -372,28 +418,6 @@ void TaskScheduler::_do_work(size_t index) {
     }
 }
 
-void TaskScheduler::_close_task(PipelineTask* task, PipelineTaskState state, 
Status exec_status) {
-    // close_a_pipeline may delete fragment context and will core in some defer
-    // code, because the defer code will access fragment context it self.
-    auto lock_for_context = task->fragment_context()->shared_from_this();
-    // is_pending_finish does not check status, so has to check status in 
close API.
-    // For example, in async writer, the writer may failed during dealing with 
eos_block
-    // but it does not return error status. Has to check the error status in 
close API.
-    // We have already refactor all source and sink api, the close API does 
not need waiting
-    // for pending finish now. So that could call close directly.
-    Status status = task->close(exec_status);
-    if (!status.ok() && state != PipelineTaskState::CANCELED) {
-        task->query_context()->cancel(true, status.to_string(),
-                                      Status::Cancelled(status.to_string()));
-        state = PipelineTaskState::CANCELED;
-    }
-    task->set_state(state);
-    task->set_close_pipeline_time();
-    task->finalize();
-    task->set_running(false);
-    task->fragment_context()->close_a_pipeline();
-}
-
 void TaskScheduler::stop() {
     if (!this->_shutdown.load()) {
         if (_task_queue) {
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 41ac8c0c098..3074cf02afc 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -103,8 +103,5 @@ private:
     CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
 
     void _do_work(size_t index);
-    // after _close_task, task maybe destructed.
-    void _close_task(PipelineTask* task, PipelineTaskState state,
-                     Status exec_status = Status::OK());
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 1f6b1b7e8b9..89d871be293 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -34,6 +34,7 @@
 #include "olap/olap_define.h"
 #include "olap/options.h"
 #include "olap/tablet_fwd.h"
+#include "pipeline/pipeline_tracing.h"
 #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove 
this include header
 #include "util/threadpool.h"
 
@@ -143,7 +144,7 @@ public:
     static bool ready() { return _s_ready.load(std::memory_order_acquire); }
     const std::string& token() const;
     ExternalScanContextMgr* external_scan_context_mgr() { return 
_external_scan_context_mgr; }
-    doris::vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; }
+    vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; }
     ResultBufferMgr* result_mgr() { return _result_mgr; }
     ResultQueueMgr* result_queue_mgr() { return _result_queue_mgr; }
     ClientCache<BackendServiceClient>* client_cache() { return 
_backend_client_cache; }
@@ -206,7 +207,7 @@ public:
     std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return 
_stream_load_executor; }
     RoutineLoadTaskExecutor* routine_load_task_executor() { return 
_routine_load_task_executor; }
     HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
-    doris::vectorized::ScannerScheduler* scanner_scheduler() { return 
_scanner_scheduler; }
+    vectorized::ScannerScheduler* scanner_scheduler() { return 
_scanner_scheduler; }
     FileMetaCache* file_meta_cache() { return _file_meta_cache; }
     MemTableMemoryLimiter* memtable_memory_limiter() { return 
_memtable_memory_limiter.get(); }
     WalManager* wal_mgr() { return _wal_manager.get(); }
@@ -264,14 +265,18 @@ public:
     }
     std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return 
_dummy_lru_cache; }
 
-    std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
get_global_block_scheduler() {
+    std::shared_ptr<pipeline::BlockedTaskScheduler> 
get_global_block_scheduler() {
         return _global_block_scheduler;
     }
 
-    doris::pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
+    pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
         return _runtime_filter_timer_queue;
     }
 
+    pipeline::PipelineTracerContext* pipeline_tracer_context() {
+        return _pipeline_tracer_ctx.get();
+    }
+
 private:
     ExecEnv();
 
@@ -291,7 +296,7 @@ private:
     UserFunctionCache* _user_function_cache = nullptr;
     // Leave protected so that subclasses can override
     ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
-    doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
+    vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
     ResultBufferMgr* _result_mgr = nullptr;
     ResultQueueMgr* _result_queue_mgr = nullptr;
     ClientCache<BackendServiceClient>* _backend_client_cache = nullptr;
@@ -342,7 +347,7 @@ private:
     RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
     SmallFileMgr* _small_file_mgr = nullptr;
     HeartbeatFlags* _heartbeat_flags = nullptr;
-    doris::vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
+    vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
 
     BlockSpillManager* _block_spill_mgr = nullptr;
     // To save meta info of external file, such as parquet footer.
@@ -374,15 +379,17 @@ private:
     std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
 
     // used for query with group cpu hard limit
-    std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_global_block_scheduler;
+    std::shared_ptr<pipeline::BlockedTaskScheduler> _global_block_scheduler;
     // used for query without workload group
-    std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_without_group_block_scheduler;
+    std::shared_ptr<pipeline::BlockedTaskScheduler> 
_without_group_block_scheduler;
 
-    doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = 
nullptr;
+    pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;
 
     WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr;
 
     RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
+
+    std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index f5f08a71064..2f6ac61d646 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -50,6 +50,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet_schema_cache.h"
 #include "olap/wal/wal_manager.h"
+#include "pipeline/pipeline_tracing.h"
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/block_spill_manager.h"
@@ -201,6 +202,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     // so it should be created before all query begin and deleted after all 
query and daemon thread stoppped
     _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr();
     init_file_cache_factory();
+    _pipeline_tracer_ctx = 
std::make_unique<pipeline::PipelineTracerContext>(); // before query
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
     _task_group_manager = new taskgroup::TaskGroupManager();
     _scanner_scheduler = new doris::vectorized::ScannerScheduler();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e52b71d277b..c722fb24bb7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -612,7 +612,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         // This may be a first fragment request of the query.
         // Create the query fragments context.
         query_ctx = QueryContext::create_shared(query_id, 
params.fragment_num_on_host, _exec_env,
-                                                params.query_options, 
params.coord);
+                                                params.query_options, 
params.coord, pipeline);
         RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), 
params.desc_tbl,
                                               &(query_ctx->desc_tbl)));
         // set file scan range params
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2c25d37d14b..bca8b409c02 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -17,6 +17,9 @@
 
 #include "runtime/query_context.h"
 
+#include <exception>
+#include <memory>
+
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_x/dependency.h"
 #include "runtime/runtime_query_statistics_mgr.h"
@@ -34,11 +37,13 @@ public:
 };
 
 QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, 
ExecEnv* exec_env,
-                           const TQueryOptions& query_options, TNetworkAddress 
coord_addr)
+                           const TQueryOptions& query_options, TNetworkAddress 
coord_addr,
+                           bool is_pipeline)
         : fragment_num(total_fragment_num),
           timeout_second(-1),
           _query_id(query_id),
           _exec_env(exec_env),
+          _is_pipeline(is_pipeline),
           _query_options(query_options) {
     this->coord_addr = coord_addr;
     _start_time = VecDateTimeValue::local_time();
@@ -46,8 +51,8 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
     _shared_scanner_controller.reset(new 
vectorized::SharedScannerController());
     _execution_dependency =
             pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", 
this);
-    _runtime_filter_mgr.reset(
-            new RuntimeFilterMgr(TUniqueId(), 
RuntimeFilterParamsContext::create(this)));
+    _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
+            TUniqueId(), RuntimeFilterParamsContext::create(this));
 
     timeout_second = query_options.execution_timeout;
 
@@ -86,7 +91,7 @@ QueryContext::~QueryContext() {
     // it is found that query already exists in _query_ctx_map, and query mem 
tracker is not used.
     // query mem tracker consumption is not equal to 0 after use, because 
there is memory consumed
     // on query mem tracker, released on other trackers.
-    std::string mem_tracker_msg {""};
+    std::string mem_tracker_msg;
     if (query_mem_tracker->peak_consumption() != 0) {
         mem_tracker_msg = fmt::format(
                 ", deregister query/load memory tracker, queryId={}, Limit={}, 
CurrUsed={}, "
@@ -95,7 +100,9 @@ QueryContext::~QueryContext() {
                 MemTracker::print_bytes(query_mem_tracker->consumption()),
                 
MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
     }
+    uint64_t group_id = 0;
     if (_task_group) {
+        group_id = _task_group->id(); // before remove
         _task_group->remove_mem_tracker_limiter(query_mem_tracker);
         _task_group->remove_query(_query_id);
     }
@@ -110,6 +117,15 @@ QueryContext::~QueryContext() {
         
static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
                 
std::make_shared<DelayReleaseToken>(std::move(_thread_token))));
     }
+
+    //TODO: check if pipeline and tracing both enabled
+    if (_is_pipeline && 
ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
+        try {
+            
ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, 
group_id);
+        } catch (std::exception& e) {
+            LOG(WARNING) << "Dump trace log failed bacause " << e.what();
+        }
+    }
 }
 
 void QueryContext::set_ready_to_execute(bool is_cancelled) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index a639268c552..a7632a443bf 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -33,7 +33,6 @@
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_predicate.h"
 #include "task_group/task_group.h"
-#include "util/pretty_printer.h"
 #include "util/threadpool.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/shared_hash_table_controller.h"
@@ -61,6 +60,7 @@ struct ReportStatusRequest {
     std::function<Status(Status)> update_fn;
     std::function<void(const PPlanFragmentCancelReason&, const std::string&)> 
cancel_fn;
 };
+
 // Save the common components of fragments in a query.
 // Some components like DescriptorTbl may be very large
 // that will slow down each execution of fragments when DeSer them every time.
@@ -70,7 +70,7 @@ class QueryContext {
 
 public:
     QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env,
-                 const TQueryOptions& query_options, TNetworkAddress 
coord_addr);
+                 const TQueryOptions& query_options, TNetworkAddress 
coord_addr, bool is_pipeline);
 
     ~QueryContext();
 
@@ -216,7 +216,7 @@ public:
 
     ThreadPool* get_non_pipe_exec_thread_pool();
 
-    int64_t mem_limit() { return _bytes_limit; }
+    int64_t mem_limit() const { return _bytes_limit; }
 
     void set_merge_controller_handler(
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
@@ -256,6 +256,7 @@ private:
     ExecEnv* _exec_env = nullptr;
     VecDateTimeValue _start_time;
     int64_t _bytes_limit = 0;
+    bool _is_pipeline = false;
 
     // A token used to submit olap scanner to the "_limited_scan_thread_pool",
     // This thread pool token is created from "_limited_scan_thread_pool" from 
exec env.
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 6c961959aec..219a9534831 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -20,13 +20,13 @@
 #include <event2/bufferevent.h>
 #include <event2/http.h>
 
-#include <algorithm>
 #include <string>
 #include <vector>
 
 #include "common/config.h"
 #include "common/status.h"
 #include "http/action/adjust_log_level.h"
+#include "http/action/adjust_tracing_dump.h"
 #include "http/action/check_rpc_channel_action.h"
 #include "http/action/check_tablet_segment_action.h"
 #include "http/action/checksum_action.h"
@@ -99,6 +99,7 @@ HttpService::~HttpService() {
     stop();
 }
 
+// NOLINTBEGIN(readability-function-size)
 Status HttpService::start() {
     add_default_path_handlers(_web_page_handler.get());
 
@@ -162,6 +163,11 @@ Status HttpService::start() {
     AdjustLogLevelAction* adjust_log_level_action = _pool.add(new 
AdjustLogLevelAction());
     _ev_http_server->register_handler(HttpMethod::POST, "api/glog/adjust", 
adjust_log_level_action);
 
+    //TODO: add query GET interface
+    auto* adjust_tracing_dump = _pool.add(new AdjustTracingDump());
+    _ev_http_server->register_handler(HttpMethod::POST, "api/pipeline/tracing",
+                                      adjust_tracing_dump);
+
     // Register BE version action
     VersionAction* version_action =
             _pool.add(new VersionAction(_env, TPrivilegeHier::GLOBAL, 
TPrivilegeType::NONE));
@@ -201,8 +207,9 @@ Status HttpService::start() {
 
     // register metrics
     {
-        auto action = _pool.add(new 
MetricsAction(DorisMetrics::instance()->metric_registry(), _env,
-                                                  TPrivilegeHier::GLOBAL, 
TPrivilegeType::NONE));
+        auto* action =
+                _pool.add(new 
MetricsAction(DorisMetrics::instance()->metric_registry(), _env,
+                                            TPrivilegeHier::GLOBAL, 
TPrivilegeType::NONE));
         _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action);
     }
 
@@ -310,6 +317,7 @@ Status HttpService::start() {
     _ev_http_server->start();
     return Status::OK();
 }
+// NOLINTEND(readability-function-size)
 
 void HttpService::stop() {
     if (stopped) {
diff --git a/be/src/util/thrift_util.cpp b/be/src/util/thrift_util.cpp
index fd141f3c74b..395c01ec390 100644
--- a/be/src/util/thrift_util.cpp
+++ b/be/src/util/thrift_util.cpp
@@ -24,18 +24,17 @@
 #include <thrift/transport/TTransportException.h>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
+#include <string>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/logging.h"
+#include "exec/tablet_info.h"
+#include "olap/tablet_schema.h"
 #include "util/thrift_server.h"
 
-namespace apache {
-namespace thrift {
-namespace protocol {
+namespace apache::thrift::protocol {
 class TProtocol;
-} // namespace protocol
-} // namespace thrift
-} // namespace apache
+} // namespace apache::thrift::protocol
 
 // TCompactProtocol requires some #defines to work right.  They also define 
UNLIKELY
 // so we need to undef this.
@@ -152,4 +151,27 @@ bool t_network_address_comparator(const TNetworkAddress& 
a, const TNetworkAddres
 
     return false;
 }
+
+std::string to_string(const TUniqueId& id) {
+    return std::to_string(id.hi).append(std::to_string(id.lo));
+}
+
+bool _has_inverted_index_or_partial_update(TOlapTableSink sink) {
+    OlapTableSchemaParam schema;
+    if (!schema.init(sink.schema).ok()) {
+        return false;
+    }
+    if (schema.is_partial_update()) {
+        return true;
+    }
+    for (const auto& index_schema : schema.indexes()) {
+        for (const auto& index : index_schema->indexes) {
+            if (index->index_type() == INVERTED) {
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
 } // namespace doris
diff --git a/be/src/util/thrift_util.h b/be/src/util/thrift_util.h
index b16482df610..aff3a9ab101 100644
--- a/be/src/util/thrift_util.h
+++ b/be/src/util/thrift_util.h
@@ -17,11 +17,13 @@
 
 #pragma once
 
-#include <stdint.h>
-#include <string.h>
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Types_types.h>
 #include <thrift/TApplicationException.h>
 #include <thrift/transport/TBufferTransports.h>
 
+#include <cstdint>
+#include <cstring>
 #include <exception>
 #include <memory>
 #include <string>
@@ -29,14 +31,10 @@
 
 #include "common/status.h"
 
-namespace apache {
-namespace thrift {
-namespace protocol {
+namespace apache::thrift::protocol {
 class TProtocol;
 class TProtocolFactory;
-} // namespace protocol
-} // namespace thrift
-} // namespace apache
+} // namespace apache::thrift::protocol
 
 namespace doris {
 
@@ -61,7 +59,7 @@ public:
         uint8_t* buffer = nullptr;
         RETURN_IF_ERROR(serialize<T>(obj, &len, &buffer));
         result->resize(len);
-        memcpy(&((*result)[0]), buffer, len);
+        memcpy(result->data(), buffer, len);
         return Status::OK();
     }
 
@@ -177,4 +175,8 @@ void t_network_address_to_string(const TNetworkAddress& 
address, std::string* ou
 // string representation
 bool t_network_address_comparator(const TNetworkAddress& a, const 
TNetworkAddress& b);
 
+PURE std::string to_string(const TUniqueId& id);
+
+PURE bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
+
 } // namespace doris
diff --git a/build.sh b/build.sh
index 7b67a303db1..ffc73dcab23 100755
--- a/build.sh
+++ b/build.sh
@@ -727,6 +727,7 @@ EOF
     cp -r -p "${DORIS_THIRDPARTY}/installed/webroot"/* 
"${DORIS_OUTPUT}/be/www"/
     copy_common_files "${DORIS_OUTPUT}/be/"
     mkdir -p "${DORIS_OUTPUT}/be/log"
+    mkdir -p "${DORIS_OUTPUT}/be/log/tracing"
     mkdir -p "${DORIS_OUTPUT}/be/storage"
 fi
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to