yiguolei commented on code in PR #20771:
URL: https://github.com/apache/doris/pull/20771#discussion_r1233010193


##########
be/src/vec/sink/vtablet_sink.cpp:
##########
@@ -1393,133 +1385,226 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
     return Status::OK();
 }
 
-Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
-        return _close_status;
+Status VOlapTableSink::_cancel_channel_and_check_intolerable_failure(
+        Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
+        const std::shared_ptr<VNodeChannel> nch) {
+    LOG(WARNING) << nch->channel_info() << ", close channel failed, err: " << 
err_msg;
+    ich->mark_as_failed(nch->node_id(), nch->host(), err_msg, -1);
+    // cancel the node channel in best effort
+    nch->cancel(err_msg);
+
+    // check if index has intolerable failure
+    Status index_st = ich->check_intolerable_failure();
+    if (!index_st.ok()) {
+        status = index_st;
+    } else if (Status st = ich->check_tablet_received_rows_consistency(); 
!st.ok()) {
+        status = st;
+    }
+    return status;
+}
+
+void VOlapTableSink::_cancel_all_channel(Status status, const std::string& 
err_msg) {
+    for (auto channel : _channels) {
+        channel->for_each_node_channel([&status](const 
std::shared_ptr<VNodeChannel>& ch) {
+            ch->cancel(status.to_string());
+        });
+    }
+    LOG(INFO) << fmt::format(
+            "{}, close olap table sink. load_id={}, txn_id={}, canceled all 
node channels due to "
+            "error: {}",
+            err_msg, print_id(_load_id), _txn_id, status);
+}
+
+bool VOlapTableSink::is_pending_finish() {
+    if (_pending_finish) {
+        bool pending_finish = false;
+        for (auto index_channel : _channels) {
+            index_channel->for_each_node_channel(
+                    [&pending_finish](const std::shared_ptr<VNodeChannel>& ch) 
{
+                        pending_finish |= ch->is_pending_finish();
+                    });
+        }
+        _pending_finish = pending_finish;
+    }
+    return _pending_finish;
+}
+
+void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) {
+    if (!_pending_finish) {
+        return;
     }
     SCOPED_TIMER(_close_timer);
     Status status = exec_status;
     if (status.ok()) {
         // only if status is ok can we call this 
_profile->total_time_counter().
         // if status is not ok, this sink may not be prepared, so that 
_profile is null
         SCOPED_TIMER(_profile->total_time_counter());
-        // BE id -> add_batch method counter
-        std::unordered_map<int64_t, AddBatchCounter> 
node_add_batch_counter_map;
-        int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0, 
actual_consume_ns = 0,
-                total_add_batch_exec_time_ns = 0, max_add_batch_exec_time_ns = 
0,
-                total_wait_exec_time_ns = 0, max_wait_exec_time_ns = 0, 
total_add_batch_num = 0,
-                num_node_channels = 0;
-        VNodeChannelStat channel_stat;
         {
-            if (config::enable_lazy_open_partition) {
-                for (auto index_channel : _channels) {
-                    index_channel->for_each_node_channel(
-                            [](const std::shared_ptr<VNodeChannel>& ch) {
-                                ch->open_partition_wait();
-                            });
-                }
-            }
-
-            for (auto index_channel : _channels) {
-                index_channel->for_each_node_channel(
-                        [](const std::shared_ptr<VNodeChannel>& ch) { 
ch->mark_close(); });
-                num_node_channels += index_channel->num_node_channels();
-            }
-
             for (auto index_channel : _channels) {
-                int64_t add_batch_exec_time = 0;
-                int64_t wait_exec_time = 0;
+                if (!status.ok()) {
+                    break;
+                }
                 index_channel->for_each_node_channel(
-                        [&index_channel, &state, &node_add_batch_counter_map, 
&serialize_batch_ns,
-                         &channel_stat, &queue_push_lock_ns, 
&actual_consume_ns,
-                         &total_add_batch_exec_time_ns, &add_batch_exec_time,
-                         &total_wait_exec_time_ns, &wait_exec_time,
-                         &total_add_batch_num](const 
std::shared_ptr<VNodeChannel>& ch) {
-                            auto s = ch->close_wait(state);
-                            if (!s.ok()) {
-                                auto err_msg = s.to_string();
-                                index_channel->mark_as_failed(ch->node_id(), 
ch->host(), err_msg,
-                                                              -1);
-                                // cancel the node channel in best effort
-                                ch->cancel(err_msg);
-                                LOG(WARNING) << ch->channel_info()
-                                             << ", close channel failed, err: 
" << err_msg;
+                        [this, &index_channel, &status](const 
std::shared_ptr<VNodeChannel>& ch) {
+                            if (!status.ok() || ch->is_closed()) {
+                                return;
+                            }
+                            // first try close, all node channels will 
mark_close()
+                            // second and after try close, only check node 
channel is cancelled,
+                            // such as node channel has rpc error.
+                            if (this->_try_closed) {
+                                if (ch->is_cancelled()) {
+                                    status = 
this->_cancel_channel_and_check_intolerable_failure(
+                                            status, ch->get_cancel_msg(), 
index_channel, ch);
+                                }
+                            } else {
+                                auto s = ch->mark_close();
+                                if (!s.ok()) {
+                                    status = 
this->_cancel_channel_and_check_intolerable_failure(
+                                            status, s.to_string(), 
index_channel, ch);
+                                }
                             }
-                            ch->time_report(&node_add_batch_counter_map, 
&serialize_batch_ns,
-                                            &channel_stat, 
&queue_push_lock_ns, &actual_consume_ns,
-                                            &total_add_batch_exec_time_ns, 
&add_batch_exec_time,
-                                            &total_wait_exec_time_ns, 
&wait_exec_time,
-                                            &total_add_batch_num);
                         });
-
-                if (add_batch_exec_time > max_add_batch_exec_time_ns) {
-                    max_add_batch_exec_time_ns = add_batch_exec_time;
-                }
-                if (wait_exec_time > max_wait_exec_time_ns) {
-                    max_wait_exec_time_ns = wait_exec_time;
-                }
-
-                // check if index has intolerable failure
-                Status index_st = index_channel->check_intolerable_failure();
-                if (!index_st.ok()) {
-                    status = index_st;
-                } else if (Status st = 
index_channel->check_tablet_received_rows_consistency();
-                           !st.ok()) {
-                    status = st;
-                }
             } // end for index channels
         }
-        // TODO need to be improved
-        LOG(INFO) << "total mem_exceeded_block_ns=" << 
channel_stat.mem_exceeded_block_ns
-                  << ", total queue_push_lock_ns=" << queue_push_lock_ns
-                  << ", total actual_consume_ns=" << actual_consume_ns
-                  << ", load id=" << print_id(_load_id);
-
-        COUNTER_SET(_input_rows_counter, _number_input_rows);
-        COUNTER_SET(_output_rows_counter, _number_output_rows);
-        COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
-        COUNTER_SET(_send_data_timer, _send_data_ns);
-        COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
-        COUNTER_SET(_filter_timer, _filter_ns);
-        COUNTER_SET(_append_node_channel_timer, 
channel_stat.append_node_channel_ns);
-        COUNTER_SET(_where_clause_timer, channel_stat.where_clause_ns);
-        COUNTER_SET(_wait_mem_limit_timer, channel_stat.mem_exceeded_block_ns);
-        COUNTER_SET(_validate_data_timer, _validate_data_ns);
-        COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
-        COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
-        COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns);
-        COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
-        COUNTER_SET(_total_wait_exec_timer, total_wait_exec_time_ns);
-        COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
-        COUNTER_SET(_add_batch_number, total_add_batch_num);
-        COUNTER_SET(_num_node_channels, num_node_channels);
-        // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
-        int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
-                                      state->num_rows_load_unselected();
-        state->set_num_rows_load_total(num_rows_load_total);
-        state->update_num_rows_load_filtered(_number_filtered_rows);
-        
state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows);
-
-        // print log of add batch time of all node, for tracing load 
performance easily
-        std::stringstream ss;
-        ss << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
-           << ", txn_id=" << _txn_id
-           << ", node add batch time(ms)/wait execution time(ms)/close 
time(ms)/num: ";
-        for (auto const& pair : node_add_batch_counter_map) {
-            ss << "{" << pair.first << ":(" << 
(pair.second.add_batch_execution_time_us / 1000)
-               << ")(" << (pair.second.add_batch_wait_execution_time_us / 
1000) << ")("
-               << pair.second.close_wait_time_ms << ")(" << 
pair.second.add_batch_num << ")} ";
+    }
+
+    if (status.ok()) {
+        if (!_try_closed) {
+            LOG(INFO) << "try close olap table sink. load_id=" << 
print_id(_load_id)
+                      << ", txn_id=" << _txn_id;
         }
-        LOG(INFO) << ss.str();
     } else {
-        for (auto channel : _channels) {
-            channel->for_each_node_channel([&status](const 
std::shared_ptr<VNodeChannel>& ch) {
-                ch->cancel(status.to_string());
-            });
+        _cancel_all_channel(status, "in try close");

Review Comment:
   直接一个status 参数就可以了, 我看了后面的message 
基本上是来说明类似调用栈的,太debug了。如果在某个地方需要调用栈,可以生成一个空的error status 打印一下就行了,就可以完整的得到调用栈。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to