Gabriel39 commented on code in PR #27829:
URL: https://github.com/apache/doris/pull/27829#discussion_r1419875804


##########
be/src/runtime/runtime_state.h:
##########
@@ -513,6 +529,8 @@ class RuntimeState {
 
     void resize_op_id_to_local_state(int operator_size, int sink_size);
 
+    std::vector<std::unique_ptr<RuntimeProfile>> _pipeline_id_to_profile;

Review Comment:
   make it private



##########
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:
##########
@@ -565,20 +591,21 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                             
pipeline_id_to_task[dep]->get_downstream_dependency());
                 }
             }
-            RETURN_IF_ERROR(prepare_and_set_parent_profile(task));
+            RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx));
         }
-
         {
             std::lock_guard<std::mutex> l(_state_map_lock);
             _instance_id_to_runtime_state.insert(
                     {UniqueId(_runtime_states[i]->fragment_instance_id()),
                      _runtime_states[i].get()});
+            
_runtime_filter_mgr_map[_runtime_states[i]->per_fragment_instance_idx()] =
+                    std::move(runtime_filter_mgr);
         }
     }
     _pipeline_parent_map.clear();
     _dag.clear();
     _op_id_to_le_state.clear();
-
+    if(_enable_local_shuffle())

Review Comment:
   What's this?



##########
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:
##########
@@ -453,41 +456,62 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
     int target_size = request.local_params.size();
     _runtime_states.resize(target_size);
     _tasks.resize(target_size);
+    _runtime_filter_mgr_map.resize(target_size);
+    auto& pipeline_id_to_profile = _runtime_state->_pipeline_id_to_profile;
+    DCHECK(pipeline_id_to_profile.empty());
+    pipeline_id_to_profile.resize(_pipelines.size());
+    {
+        size_t pip_idx = 0;
+        for (auto& pipeline_profile : pipeline_id_to_profile) {
+            pipeline_profile =
+                    std::make_unique<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_idx));
+            pip_idx++;
+        }
+    }
+
     for (size_t i = 0; i < target_size; i++) {
         const auto& local_params = request.local_params[i];
+        std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
+        auto set_runtime_state = [&](std::unique_ptr<RuntimeState>& 
runtime_state) {
+            
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
 
-        _runtime_states[i] = RuntimeState::create_unique(
-                local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
-                request.query_options, _query_ctx->query_globals, _exec_env);
-        if (local_params.__isset.runtime_filter_params) {
-            
_runtime_states[i]->set_runtime_filter_params(local_params.runtime_filter_params);
-        }
-        _runtime_states[i]->set_query_ctx(_query_ctx.get());
-        
_runtime_states[i]->set_query_mem_tracker(_query_ctx->query_mem_tracker);
+            runtime_state->set_query_ctx(_query_ctx.get());
+            runtime_state->set_be_number(local_params.backend_num);
 
-        static_cast<void>(_runtime_states[i]->runtime_filter_mgr()->init());
-        _runtime_states[i]->set_be_number(local_params.backend_num);
+            if (request.__isset.backend_id) {
+                runtime_state->set_backend_id(request.backend_id);
+            }
+            if (request.__isset.import_label) {
+                runtime_state->set_import_label(request.import_label);
+            }
+            if (request.__isset.db_name) {
+                runtime_state->set_db_name(request.db_name);
+            }
+            if (request.__isset.load_job_id) {
+                runtime_state->set_load_job_id(request.load_job_id);
+            }
 
-        if (request.__isset.backend_id) {
-            _runtime_states[i]->set_backend_id(request.backend_id);
-        }
-        if (request.__isset.import_label) {
-            _runtime_states[i]->set_import_label(request.import_label);
-        }
-        if (request.__isset.db_name) {
-            _runtime_states[i]->set_db_name(request.db_name);
-        }
-        if (request.__isset.load_job_id) {
-            _runtime_states[i]->set_load_job_id(request.load_job_id);
-        }
+            runtime_state->set_desc_tbl(_desc_tbl);
+            
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
+            runtime_state->set_num_per_fragment_instances(request.num_senders);
+            runtime_state->resize_op_id_to_local_state(max_operator_id(), 
max_sink_operator_id());
+            
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+            runtime_state->set_total_load_streams(request.total_load_streams);
+            runtime_state->set_num_local_sink(request.num_local_sink);
+            DCHECK(runtime_filter_mgr);
+            
runtime_state->set_pipeline_x_runtime_filter_mgr(runtime_filter_mgr.get());
+        };
+        // build instance runtime state
+        _runtime_states[i] = RuntimeState::create_unique(
+                this, local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
+                request.query_options, _query_ctx->query_globals, _exec_env);
+        // build runtime_filter_mgr for each instance
+        runtime_filter_mgr =
+                std::make_unique<RuntimeFilterMgr>(request.query_id, 
_runtime_states[i].get());
+        
runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params);

Review Comment:
   Add a judgement for `__isset.runtime_filter_params`



##########
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:
##########
@@ -1162,9 +1190,9 @@ Status PipelineXFragmentContext::send_report(bool done) {
     }
 
     return _report_status_cb(
-            {true, exec_status, runtime_states, nullptr, nullptr, done || 
!exec_status.ok(),
-             _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), 
_backend_num,
-             _runtime_state.get(),
+            {true, exec_status, runtime_states, nullptr, 
_runtime_state->load_channel_profile(),

Review Comment:
   Use task_runtime_states instead of `runtime_states `



##########
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:
##########
@@ -504,10 +528,25 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
             }
             return le_state_map;
         };
+
+        auto get_task_runtime_state = [&](int task_id) -> RuntimeState* {
+            if (!_enable_local_shuffle()) {
+                return _runtime_states[i].get();
+            }
+            return _task_runtime_states[task_id].get();

Review Comment:
   We should alway use `_task_runtime_states` and remove `_runtime_states `



##########
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h:
##########
@@ -219,6 +218,21 @@ class PipelineXFragmentContext : public 
PipelineFragmentContext {
     int _sink_operator_id = 0;
     int _num_instances = 0;
     std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> 
_op_id_to_le_state;
+
+    // _per_fragment_instance_idx -> runtime mgr
+    std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;
+
+    //Here are three types of runtime states:
+    //    - _runtime state is at the Fragment level.
+    //    - _runtime_states[] is at the instance level, shared by all tasks 
within this instance.
+    //        To handle runtime filters, etc., messages are transmitted among 
different tasks within the instance.
+    //    - _task_runtime_states is at the task level, unique to each task.
+
+    // Local runtime states for each instance
+    std::vector<std::unique_ptr<RuntimeState>> _runtime_states;

Review Comment:
   remove this



##########
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp:
##########
@@ -453,41 +456,62 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
     int target_size = request.local_params.size();
     _runtime_states.resize(target_size);
     _tasks.resize(target_size);
+    _runtime_filter_mgr_map.resize(target_size);
+    auto& pipeline_id_to_profile = _runtime_state->_pipeline_id_to_profile;
+    DCHECK(pipeline_id_to_profile.empty());
+    pipeline_id_to_profile.resize(_pipelines.size());
+    {
+        size_t pip_idx = 0;
+        for (auto& pipeline_profile : pipeline_id_to_profile) {
+            pipeline_profile =
+                    std::make_unique<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_idx));
+            pip_idx++;
+        }
+    }
+
     for (size_t i = 0; i < target_size; i++) {
         const auto& local_params = request.local_params[i];
+        std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
+        auto set_runtime_state = [&](std::unique_ptr<RuntimeState>& 
runtime_state) {
+            
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
 
-        _runtime_states[i] = RuntimeState::create_unique(
-                local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
-                request.query_options, _query_ctx->query_globals, _exec_env);
-        if (local_params.__isset.runtime_filter_params) {
-            
_runtime_states[i]->set_runtime_filter_params(local_params.runtime_filter_params);
-        }
-        _runtime_states[i]->set_query_ctx(_query_ctx.get());
-        
_runtime_states[i]->set_query_mem_tracker(_query_ctx->query_mem_tracker);
+            runtime_state->set_query_ctx(_query_ctx.get());
+            runtime_state->set_be_number(local_params.backend_num);
 
-        static_cast<void>(_runtime_states[i]->runtime_filter_mgr()->init());
-        _runtime_states[i]->set_be_number(local_params.backend_num);
+            if (request.__isset.backend_id) {
+                runtime_state->set_backend_id(request.backend_id);
+            }
+            if (request.__isset.import_label) {
+                runtime_state->set_import_label(request.import_label);
+            }
+            if (request.__isset.db_name) {
+                runtime_state->set_db_name(request.db_name);
+            }
+            if (request.__isset.load_job_id) {
+                runtime_state->set_load_job_id(request.load_job_id);
+            }
 
-        if (request.__isset.backend_id) {
-            _runtime_states[i]->set_backend_id(request.backend_id);
-        }
-        if (request.__isset.import_label) {
-            _runtime_states[i]->set_import_label(request.import_label);
-        }
-        if (request.__isset.db_name) {
-            _runtime_states[i]->set_db_name(request.db_name);
-        }
-        if (request.__isset.load_job_id) {
-            _runtime_states[i]->set_load_job_id(request.load_job_id);
-        }
+            runtime_state->set_desc_tbl(_desc_tbl);
+            
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
+            runtime_state->set_num_per_fragment_instances(request.num_senders);
+            runtime_state->resize_op_id_to_local_state(max_operator_id(), 
max_sink_operator_id());
+            
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+            runtime_state->set_total_load_streams(request.total_load_streams);
+            runtime_state->set_num_local_sink(request.num_local_sink);
+            DCHECK(runtime_filter_mgr);
+            
runtime_state->set_pipeline_x_runtime_filter_mgr(runtime_filter_mgr.get());
+        };
+        // build instance runtime state
+        _runtime_states[i] = RuntimeState::create_unique(
+                this, local_params.fragment_instance_id, request.query_id, 
request.fragment_id,
+                request.query_options, _query_ctx->query_globals, _exec_env);
+        // build runtime_filter_mgr for each instance
+        runtime_filter_mgr =

Review Comment:
   Do not pass `_runtime_states[i].get()` in this constructor because this 
runtime state is instance's level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to