chenhao7253886 closed pull request #513: Add cpu and io indicates to audit log
URL: https://github.com/apache/incubator-doris/pull/513
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 2064bf62..3048b95f 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "gen_cpp/DataSinks_types.h"
 #include "gen_cpp/Exprs_types.h"
+#include "runtime/exec_node_consumption_provider.h"
 #include "runtime/mem_tracker.h"
 
 namespace doris {
@@ -78,11 +79,17 @@ class DataSink {
     // Returns the runtime profile for the sink.
     virtual RuntimeProfile* profile() = 0;
 
+    void set_query_consumption(const ExecNodeConsumptionProvider::Consumption& 
consumption) {
+        _query_consumption = consumption;
+    }
+
 protected:
     // Set to true after close() has been called. subclasses should check and 
set this in
     // close().
     bool _closed;
     std::unique_ptr<MemTracker> _expr_mem_tracker;
+
+    ExecNodeConsumptionProvider::Consumption _query_consumption;
 };
 
 }  // namespace doris
diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 4c7828c3..52fbe98e 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -131,6 +131,7 @@ Status ExchangeNode::get_next(RuntimeState* state, 
RowBatch* output_batch, bool*
 
     if (reached_limit()) {
         _stream_recvr->transfer_all_resources(output_batch);
+        set_runtime_consumption(state);
         *eos = true;
         return Status::OK;
     } else {
@@ -179,6 +180,7 @@ Status ExchangeNode::get_next(RuntimeState* state, 
RowBatch* output_batch, bool*
 
             if (reached_limit()) {
                 _stream_recvr->transfer_all_resources(output_batch);
+                set_runtime_consumption(state); 
                 *eos = true;
                 return Status::OK;
             }
@@ -197,6 +199,7 @@ Status ExchangeNode::get_next(RuntimeState* state, 
RowBatch* output_batch, bool*
         RETURN_IF_ERROR(fill_input_row_batch(state));
         *eos = (_input_batch == NULL);
         if (*eos) {
+            set_runtime_consumption(state);
             return Status::OK;
         }
 
@@ -243,6 +246,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, 
RowBatch* output_batc
     // by the merger to the output batch.
     if (*eos) {
         _stream_recvr->transfer_all_resources(output_batch);
+        set_runtime_consumption(state);
     }
 
     COUNTER_SET(_rows_returned_counter, _num_rows_returned);
diff --git a/be/src/exec/exchange_node.h b/be/src/exec/exchange_node.h
index 6450d39f..b800e71f 100644
--- a/be/src/exec/exchange_node.h
+++ b/be/src/exec/exchange_node.h
@@ -21,11 +21,12 @@
 #include <boost/scoped_ptr.hpp>
 #include "exec/exec_node.h"
 #include "exec/sort_exec_exprs.h"
+#include "runtime/data_stream_recvr.h"
+#include "runtime/exec_node_consumption_provider.h"
 
 namespace doris {
 
 class RowBatch;
-class DataStreamRecvr;
 class RuntimeProfile;
 
 // Receiver node for data streams. The data stream receiver is created in 
Prepare()
@@ -61,6 +62,12 @@ class ExchangeNode : public ExecNode {
     virtual void debug_string(int indentation_level, std::stringstream* out) 
const;
 
 private:
+
+    void set_runtime_consumption(RuntimeState* state) {
+        ExecNodeConsumptionProvider::Consumption consumption = 
_stream_recvr->get_sub_plan_consumption();
+        state->add_sub_plan_consumption(consumption);
+    }
+
     // Implements GetNext() for the case where _is_merging is true. Delegates 
the GetNext()
     // call to the underlying DataStreamRecvr.
     Status get_next_merging(RuntimeState* state, RowBatch* output_batch, bool* 
eos);
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index f4486dea..d79b1794 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -31,9 +31,13 @@ void GetResultBatchCtx::on_failure(const Status& status) {
     delete this;
 }
 
-void GetResultBatchCtx::on_close(int64_t packet_seq) {
+void GetResultBatchCtx::on_close(int64_t packet_seq,
+                 ExecNodeConsumptionProvider::Consumption* consumption) {
     Status status;
     status.to_protobuf(result->mutable_status());
+    if (consumption != nullptr) {
+        consumption->serialize(result->mutable_query_consumption());
+    }
     result->set_packet_seq(packet_seq);
     result->set_eos(true);
     done->Run();
@@ -183,7 +187,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {
         return;
     }
     if (_is_close) {
-        ctx->on_close(_packet_num);
+        ctx->on_close(_packet_num, &_consumption);
         return;
     }
     // no ready data, push ctx to waiting list
@@ -200,7 +204,7 @@ Status BufferControlBlock::close(Status exec_status) {
     if (!_waiting_rpc.empty()) {
         if (_status.ok()) {
             for (auto& ctx : _waiting_rpc) {
-                ctx->on_close(_packet_num);
+                ctx->on_close(_packet_num, &_consumption);
             }
         } else {
             for (auto& ctx : _waiting_rpc) {
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index 701ef5f7..c02fa882 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -24,6 +24,7 @@
 #include <boost/thread/condition_variable.hpp>
 #include "common/status.h"
 #include "gen_cpp/Types_types.h"
+#include "runtime/exec_node_consumption_provider.h"
 
 namespace google {
 namespace protobuf {
@@ -52,7 +53,7 @@ struct GetResultBatchCtx {
     }
 
     void on_failure(const Status& status);
-    void on_close(int64_t packet_seq);
+    void on_close(int64_t packet_seq, 
ExecNodeConsumptionProvider::Consumption* consumption = nullptr);
     void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos = 
false);
 };
 
@@ -80,6 +81,9 @@ class BufferControlBlock {
         return _fragment_id;
     }
 
+    void set_query_consumption(const ExecNodeConsumptionProvider::Consumption& 
consumption) {
+        _consumption = consumption;
+    }
 private:
     typedef std::list<TFetchDataResult*> ResultQueue;
 
@@ -100,8 +104,10 @@ class BufferControlBlock {
     boost::condition_variable _data_arriaval;
     // signal removal of data by stream consumer
     boost::condition_variable _data_removal;
-
+   
     std::deque<GetResultBatchCtx*> _waiting_rpc;
+
+    ExecNodeConsumptionProvider::Consumption _consumption;
 };
 
 }
diff --git a/be/src/runtime/data_stream_mgr.cpp 
b/be/src/runtime/data_stream_mgr.cpp
index 0c75fbb0..dee0c938 100644
--- a/be/src/runtime/data_stream_mgr.cpp
+++ b/be/src/runtime/data_stream_mgr.cpp
@@ -121,7 +121,8 @@ Status DataStreamMgr::add_data(
 Status DataStreamMgr::close_sender(const TUniqueId& fragment_instance_id,
                                    PlanNodeId dest_node_id,
                                    int sender_id, 
-                                   int be_number) {
+                                   int be_number,
+                                   const PQueryConsumption& consumption) {
     VLOG_FILE << "close_sender(): fragment_instance_id=" << 
fragment_instance_id
         << ", node=" << dest_node_id;
     shared_ptr<DataStreamRecvr> recvr = find_recvr(fragment_instance_id, 
dest_node_id);
@@ -135,6 +136,7 @@ Status DataStreamMgr::close_sender(const TUniqueId& 
fragment_instance_id,
         // errors from receiver-initiated teardowns.
         return Status::OK;
     }
+    recvr->add_sub_plan_consumption(consumption);
     recvr->remove_sender(sender_id, be_number);
     return Status::OK;
 }
diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h
index a4880b65..ca94dc6c 100644
--- a/be/src/runtime/data_stream_mgr.h
+++ b/be/src/runtime/data_stream_mgr.h
@@ -46,6 +46,7 @@ class DataStreamRecvr;
 class RowBatch;
 class RuntimeState;
 class PRowBatch;
+class PQueryConsumption;
 class PUniqueId;
 
 // Singleton class which manages all incoming data streams at a backend node. 
It
@@ -96,7 +97,7 @@ class DataStreamMgr {
     // sender has closed.
     // Returns OK if successful, error status otherwise.
     Status close_sender(const TUniqueId& fragment_instance_id, PlanNodeId 
dest_node_id,
-            int sender_id, int be_number);
+            int sender_id, int be_number, const PQueryConsumption& 
consumption);
 
     // Closes all receivers registered for fragment_instance_id immediately.
     void cancel(const TUniqueId& fragment_instance_id);
diff --git a/be/src/runtime/data_stream_recvr.cc 
b/be/src/runtime/data_stream_recvr.cc
index f007e6f6..84f083de 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -242,6 +242,7 @@ void DataStreamRecvr::SenderQueue::add_batch(
         // it in this thread.
         batch = new RowBatch(_recvr->row_desc(), pb_batch, 
_recvr->mem_tracker());
     }
+   
     VLOG_ROW << "added #rows=" << batch->num_rows()
         << " batch_size=" << batch_size << "\n";
     _batch_queue.emplace_back(batch_size, batch);
@@ -433,4 +434,9 @@ Status DataStreamRecvr::get_batch(RowBatch** next_batch) {
     return _sender_queues[0]->get_batch(next_batch);
 }
 
+void DataStreamRecvr::add_sub_plan_consumption(const PQueryConsumption& 
p_consumption) {
+    ExecNodeConsumptionProvider::Consumption consumption;
+    consumption.deserialize(p_consumption);
+    _sub_plan_consumption.add(consumption);
+}
 }
diff --git a/be/src/runtime/data_stream_recvr.h 
b/be/src/runtime/data_stream_recvr.h
index 22bca1ce..7b897f6e 100644
--- a/be/src/runtime/data_stream_recvr.h
+++ b/be/src/runtime/data_stream_recvr.h
@@ -24,6 +24,7 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "gen_cpp/Types_types.h" // for TUniqueId
+#include "runtime/exec_node_consumption_provider.h"
 #include "runtime/descriptors.h"
 #include "util/tuple_row_compare.h"
 
@@ -99,6 +100,11 @@ class DataStreamRecvr {
     const RowDescriptor& row_desc() const { return _row_desc; }
     MemTracker* mem_tracker() const { return _mem_tracker.get(); }
 
+    void add_sub_plan_consumption(const PQueryConsumption& p_consumption);
+
+    ExecNodeConsumptionProvider::Consumption get_sub_plan_consumption() {
+        return _sub_plan_consumption;
+    }
 private:
     friend class DataStreamMgr;
     class SenderQueue;
@@ -194,6 +200,7 @@ class DataStreamRecvr {
     // Wall time senders spend waiting for the recv buffer to have capacity.
     RuntimeProfile::Counter* _buffer_full_wall_timer;
 
+    ExecNodeConsumptionProvider::Consumption _sub_plan_consumption; 
     // Total time spent waiting for data to arrive in the recv buffer
     // RuntimeProfile::Counter* _data_arrival_timer;
 };
diff --git a/be/src/runtime/data_stream_sender.cpp 
b/be/src/runtime/data_stream_sender.cpp
index d0c60960..c319692f 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -71,7 +71,7 @@ class DataStreamSender::Channel {
     Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
             const TNetworkAddress& brpc_dest,
             const TUniqueId& fragment_instance_id,
-            PlanNodeId dest_node_id, int buffer_size) :
+            PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain) :
         _parent(parent),
         _buffer_size(buffer_size),
         _row_desc(row_desc),
@@ -80,7 +80,8 @@ class DataStreamSender::Channel {
         _num_data_bytes_sent(0),
         _packet_seq(0),
         _need_close(false),
-        _brpc_dest_addr(brpc_dest) {
+        _brpc_dest_addr(brpc_dest),
+        _is_transfer_chain(is_transfer_chain) {
     }
 
     virtual ~Channel() {
@@ -163,6 +164,8 @@ class DataStreamSender::Channel {
     palo::PInternalService_Stub* _brpc_stub = nullptr;
     RefCountClosure<PTransmitDataResult>* _closure = nullptr;
     int32_t _brpc_timeout_ms = 500;
+    // whether the dest can be treated as consumption transfer chain.
+    bool _is_transfer_chain;
 };
 
 Status DataStreamSender::Channel::init(RuntimeState* state) {
@@ -203,6 +206,10 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* 
batch, bool eos) {
     }
     VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id
              << " dest_node=" << _dest_node_id;
+    if (eos && _is_transfer_chain) {
+        auto consumption = _brpc_request.mutable_query_consumption();
+        _parent->_query_consumption.serialize(consumption); 
+    }
 
     _brpc_request.set_eos(eos);
     if (batch != nullptr) {
@@ -305,11 +312,16 @@ DataStreamSender::DataStreamSender(
             || sink.output_partition.type == 
TPartitionType::RANGE_PARTITIONED);
     // TODO: use something like google3's linked_ptr here (scoped_ptr isn't 
copyable)
     for (int i = 0; i < destinations.size(); ++i) {
+        bool is_transfer_chain = false;
+        if (destinations[i].__isset.is_transfer_chain) {
+            is_transfer_chain = destinations[i].is_transfer_chain;
+        }
         _channel_shared_ptrs.emplace_back(
             new Channel(this, row_desc,
                         destinations[i].brpc_server,
                         destinations[i].fragment_instance_id,
-                        sink.dest_node_id, per_channel_buffer_size));
+                        sink.dest_node_id, per_channel_buffer_size, 
+                        is_transfer_chain));
         _channels.push_back(_channel_shared_ptrs[i].get());
     }
 }
diff --git a/be/src/runtime/exec_node_consumption_provider.h 
b/be/src/runtime/exec_node_consumption_provider.h
new file mode 100644
index 00000000..a8aae483
--- /dev/null
+++ b/be/src/runtime/exec_node_consumption_provider.h
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_RUNTIME_EXEC_NODE_CONSUMPTION_PROVIDER_H
+#define DORIS_BE_RUNTIME_EXEC_NODE_CONSUMPTION_PROVIDER_H
+
+#include "util/runtime_profile.h"
+#include "util/string_util.h"
+#include "gen_cpp/data.pb.h"
+#include "gen_cpp/PlanNodes_types.h"
+
+namespace doris {
+
+// Generate ExecNode resource Consumption with RuntimeProfile. CPU 
+// consumption is measured by the number of rows processed and IO 
+// consumption is measured by the size of the scans.
+class ExecNodeConsumptionProvider {
+public:
+
+    ExecNodeConsumptionProvider() {
+        init();
+    }
+
+    class Consumption {
+    public:
+        Consumption() : cpu(0), io(0) {
+        }
+
+        void add(const Consumption& other) {
+            cpu.add(other.cpu);
+            io.add(other.io);
+        }
+
+        void serialize(PQueryConsumption* consumption) {
+            DCHECK(consumption != nullptr);
+            consumption->set_cpu(cpu.load());
+            consumption->set_io(io.load());
+        }
+
+        void deserialize(const PQueryConsumption& consumption) {
+            cpu.store(consumption.cpu());
+            io.store(consumption.io());
+        }
+
+        int64_t get_cpu() {
+            return cpu.load();
+        }
+
+        int64_t get_io() {
+            return io.load();
+        }
+
+        void set(int64_t cpu, int64_t io) {
+            this->cpu.store(cpu);
+            this->io.store(io);
+        }
+
+        Consumption& operator=(const Consumption& other) {
+            if (this != &other) {
+                set(other.cpu, other.io); 
+            }
+            return *this;
+        }
+    private:
+        AtomicInt64 cpu;
+        AtomicInt64 io;
+    };
+
+    Consumption get_consumption(RuntimeProfile* profile) {
+        Consumption total_consumption;
+        std::vector<RuntimeProfile*> all_profiles;
+        profile->get_all_children(&all_profiles);
+        for (auto profile : all_profiles) {
+            // ExecNode's RuntimeProfile name is "$ExecNode_type_name (id=?)" 
+            std::vector<std::string> elements;
+            boost::split(elements, profile->name(), boost::is_any_of(" "), 
boost::token_compress_off);
+            Consumption consumption;
+            bool has = get_consumption(profile, &consumption, elements[0]);
+            if (elements.size() == 2 && has) {
+                total_consumption.add(consumption);
+            }
+        }
+        return total_consumption;
+    }
+
+private:
+
+    void init() {
+        functions["OLAP_SCAN_NODE"] = get_olap_scan_consumption;
+        functions["HASH_JOIN_NODE"] = get_hash_join_consumption;
+        functions["AGGREGATION_NODE"] = get_hash_agg_consumption;
+        functions["SORT_NODE"] = get_sort_consumption;
+        functions["ANALYTIC_EVAL_NODE"] = get_windows_consumption;
+        functions["UNION_NODE"] = get_union_consumption;
+        functions["EXCHANGE_NODE"] = get_exchange_consumption;
+    }
+
+    bool get_consumption(RuntimeProfile* profile, Consumption* consumption, 
const std::string& name) {
+        ConsumptionFunc get_consumption_func = functions[name];
+        if (get_consumption_func != nullptr) {
+            get_consumption_func(profile, consumption);
+            return true;
+        }
+        return false;
+    }
+
+    static void get_olap_scan_consumption(RuntimeProfile* profile, 
Consumption* consumption) {
+       RuntimeProfile::Counter* read_compressed_counter = 
profile->get_counter("CompressedBytesRead");
+       consumption->set(0, read_compressed_counter->value());
+    }
+
+    static void get_hash_join_consumption(RuntimeProfile* profile, 
Consumption* consumption) {
+        RuntimeProfile::Counter* probe_counter = 
profile->get_counter("ProbeRows");
+        RuntimeProfile::Counter* build_counter = 
profile->get_counter("BuildRows");
+        consumption->set(probe_counter->value() + build_counter->value(), 0);
+    }
+
+    static void get_hash_agg_consumption(RuntimeProfile* profile, Consumption* 
consumption) {
+        RuntimeProfile::Counter* build_counter = 
profile->get_counter("BuildRows");
+        consumption->set(build_counter->value(), 0);
+    }
+
+    static void get_sort_consumption(RuntimeProfile* profile, Consumption* 
consumption) {
+        RuntimeProfile::Counter* sort_counter = 
profile->get_counter("SortRows");
+        consumption->set(sort_counter->value(), 0);
+    }
+
+    static void get_windows_consumption(RuntimeProfile* profile, Consumption* 
consumption) {
+        RuntimeProfile::Counter* process_counter = 
profile->get_counter("ProcessRows");
+        consumption->set(process_counter->value(), 0);
+    }
+
+    static void get_union_consumption(RuntimeProfile* profile, Consumption* 
consumption) {
+        RuntimeProfile::Counter* materialize_counter = 
profile->get_counter("MaterializeRows");
+        consumption->set(materialize_counter->value(), 0);
+    }
+
+    static void get_exchange_consumption(RuntimeProfile* profile, Consumption* 
consumption) {
+        RuntimeProfile::Counter* merge_counter = 
profile->get_counter("MergeRows");
+        // exchange merge sort
+        if (merge_counter != nullptr) {
+            consumption->set(merge_counter->value(), 0);
+        }
+    }
+
+    typedef std::function<void(RuntimeProfile*, Consumption*)> ConsumptionFunc;
+    // ExecNode type name to function
+    std::map<std::string, ConsumptionFunc> functions;
+};
+
+}
+
+#endif
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 1e769ee6..db624001 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -333,6 +333,8 @@ Status PlanFragmentExecutor::open_internal() {
     // audit the sinks to check that this is ok, or change that behaviour.
     {
         SCOPED_TIMER(profile()->total_time_counter());
+        ExecNodeConsumptionProvider::Consumption consumption = 
runtime_state()->get_consumption(); 
+        _sink->set_query_consumption(consumption);
         Status status = _sink->close(runtime_state(), _status);
         RETURN_IF_ERROR(status);
     }
diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp
index b5eb2c97..1b08ec32 100644
--- a/be/src/runtime/result_sink.cpp
+++ b/be/src/runtime/result_sink.cpp
@@ -82,6 +82,8 @@ Status ResultSink::close(RuntimeState* state, Status 
exec_status) {
     }
     // close sender, this is normal path end
     if (_sender) {
+        // In the last, send consumption of execnode.
+        _sender->set_query_consumption(_query_consumption);  
         _sender->close(exec_status);
     }
     state->exec_env()->result_mgr()->cancel_at_time(time(NULL) + 
config::result_buffer_cancelled_interval_time, 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index b58ef954..a6cb40be 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -33,6 +33,7 @@
 
 #include "common/global_types.h"
 #include "util/logging.h"
+#include "runtime/exec_node_consumption_provider.h"
 #include "runtime/mem_pool.h"
 #include "runtime/thread_resource_mgr.h"
 #include "gen_cpp/Types_types.h"  // for TUniqueId
@@ -492,6 +493,18 @@ class RuntimeState {
         return _is_running;
     }
 
+    void add_sub_plan_consumption(const 
ExecNodeConsumptionProvider::Consumption& consumption) {
+        _sub_plan_consumption.add(consumption);
+    }
+
+    ExecNodeConsumptionProvider::Consumption get_consumption() {
+        ExecNodeConsumptionProvider provider;
+        ExecNodeConsumptionProvider::Consumption total_consumption;
+        total_consumption = provider.get_consumption(&_profile);
+        total_consumption.add(_sub_plan_consumption);
+        return total_consumption; 
+    }
+
 private:
     // Allow TestEnv to set block_mgr manually for testing.
     friend class TestEnv;
@@ -638,6 +651,9 @@ class RuntimeState {
     /// TODO: not needed if we call ReleaseResources() in a timely manner 
(IMPALA-1575).
     AtomicInt32 _initial_reservation_refcnt;
 
+    // Consumption from sub plan, it should only be updated by ExchangeNode.
+    ExecNodeConsumptionProvider::Consumption _sub_plan_consumption;
+
     // prohibit copies
     RuntimeState(const RuntimeState&);
 };
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 056495a2..496fa891 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -130,14 +130,14 @@ void BackendService::transmit_data(TTransmitDataResult& 
return_val,
     }
 
     if (params.eos) {
-        Status status = _exec_env->stream_mgr()->close_sender(
-                params.dest_fragment_instance_id,
-                params.dest_node_id,
-                params.sender_id,
-                params.be_number);
-        VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false")
-                << " close_sender status: " << status.get_error_msg();
-        status.set_t_status(&return_val);
+        // Status status = _exec_env->stream_mgr()->close_sender(
+        //        params.dest_fragment_instance_id,
+        //        params.dest_node_id,
+        //        params.sender_id,
+        //        params.be_number);
+        //VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false")
+        //        << " close_sender status: " << status.get_error_msg();
+        //status.set_t_status(&return_val);
     }
 }
 
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 2f4b3f34..e5375a29 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -60,7 +60,7 @@ void 
PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
         finst_id.__set_lo(request->finst_id().lo());
         _exec_env->stream_mgr()->close_sender(
             finst_id, request->node_id(),
-            request->sender_id(), request->be_number());
+            request->sender_id(), request->be_number(), 
request->query_consumption());
     }
     if (done != nullptr) {
         done->Run();
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
index 21ffef73..88cdbed3 100644
--- 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
+++ 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java
@@ -79,8 +79,8 @@ private ProcResult requestFragmentExecInfos() throws 
AnalysisException {
             rowData.add(instanceConsumption.getFragmentId());
             rowData.add(instanceConsumption.getInstanceId().toString());
             rowData.add(instanceConsumption.getAddress().toString());
-            
rowData.add(String.valueOf(instanceConsumption.getTotalIoConsumption()));
-            
rowData.add(String.valueOf(instanceConsumption.getTotalCpuConsumption()));
+            rowData.add(instanceConsumption.getFormattingIoConsumption());
+            rowData.add(instanceConsumption.getFormattingCpuConsumption());
             sortedRowDatas.add(rowData);
         }
 
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
index f18d9d13..647efca8 100644
--- 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
+++ 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
@@ -36,6 +36,7 @@
 import org.apache.logging.log4j.Logger;
 
 import java.util.Collection;
+import java.util.Formatter;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -334,7 +335,7 @@ private String parsePossibleExecNodeName(String str) {
             }
         }
 
-        public long getTotalCpuConsumption() {
+        private long getTotalCpuConsumption() {
             long cpu = 0;
             for (ConsumptionCalculator consumption : calculators) {
                 cpu += consumption.getCpu();
@@ -342,13 +343,27 @@ public long getTotalCpuConsumption() {
             return cpu;
         }
 
-        public long getTotalIoConsumption() {
+        private long getTotalIoConsumption() {
             long io = 0;
             for (ConsumptionCalculator consumption : calculators) {
                 io += consumption.getIo();
             }
             return io;
         }
+
+        public String getFormattingCpuConsumption() {
+            final StringBuilder builder = new StringBuilder();
+            builder.append(getTotalCpuConsumption()).append(" Rows");
+            return builder.toString();
+        }
+
+        public String getFormattingIoConsumption() {
+            final Pair<Double, String> pair = 
DebugUtil.getByteUint(getTotalIoConsumption());
+            final Formatter fmt = new Formatter();
+            final StringBuilder builder = new StringBuilder();
+            builder.append(fmt.format("%.2f", pair.first)).append(" 
").append(pair.second);
+            return builder.toString();
+        }
     }
 
     public static class InstanceConsumption extends Consumption {
diff --git 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
index a59c3dbe..9f16be1b 100644
--- 
a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
+++ 
b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java
@@ -76,8 +76,8 @@ public ProcResult fetchResult() throws AnalysisException {
             values.add(item.getDb());
             values.add(item.getUser());
             final CurrentQueryInfoProvider.Consumption consumption = 
consumptions.get(item.getQueryId());
-            values.add(String.valueOf(consumption.getTotalIoConsumption()));
-            values.add(String.valueOf(consumption.getTotalCpuConsumption()));
+            values.add(consumption.getFormattingIoConsumption());
+            values.add(consumption.getFormattingCpuConsumption());
             values.add(item.getQueryExecTime());
             sortedRowData.add(values);
         }
diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 560d538b..7fada7e5 100644
--- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -37,6 +37,7 @@
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
 import org.apache.logging.log4j.LogManager;
@@ -92,12 +93,16 @@ private void handlePing() {
         ctx.getState().setOk();
     }
 
-    private void auditAfterExec(String origStmt, StatementBase parsedStmt) {
+    private void auditAfterExec(String origStmt, StatementBase parsedStmt,
+                StmtExecutor.QueryConsumption queryConsumption) {
         // slow query
         long elapseMs = System.currentTimeMillis() - ctx.getStartTime();
         // query state log
         ctx.getAuditBuilder().put("state", ctx.getState());
         ctx.getAuditBuilder().put("time", elapseMs);
+        Preconditions.checkNotNull(queryConsumption); 
+        ctx.getAuditBuilder().put("cpu", 
queryConsumption.getFormattingCpuConsumption());
+        ctx.getAuditBuilder().put("io", 
queryConsumption.getFormattingIoConsumption());
         ctx.getAuditBuilder().put("returnRows", ctx.getReturnRows());
         ctx.getAuditBuilder().put("stmt_id", ctx.getStmtId());
 
@@ -177,7 +182,8 @@ private void handleQuery() {
 
         // audit after exec
         // replace '\n' to '\\n' to make string in one line
-        auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt());
+        auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt(), 
+                executor.getQueryConsumptionForAuditLog());
     }
 
     // Get the column definitions of a table
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index 252a9984..8947718b 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -574,12 +574,12 @@ void updateStatus(Status status) {
         }
     }
 
-    TResultBatch getNext() throws Exception {
+    public RowBatch getNext() throws Exception {
         if (receiver == null) {
             throw new UserException("There is no receiver.");
         }
 
-        TResultBatch resultBatch;
+        RowBatch resultBatch;
         Status status = new Status();
 
         resultBatch = receiver.getNext(status);
@@ -611,7 +611,7 @@ TResultBatch getNext() throws Exception {
             }
         }
 
-        if (resultBatch == null) {
+        if (resultBatch.isEos()) {
             this.returnedAllResults = true;
 
             // if this query is a block query do not cancel.
@@ -622,7 +622,7 @@ TResultBatch getNext() throws Exception {
                 cancelInternal();
             }
         } else {
-            numReceivedRows += resultBatch.getRowsSize();
+            numReceivedRows += resultBatch.getBatch().getRowsSize();
         }
 
         return resultBatch;
@@ -746,6 +746,12 @@ private void computeFragmentExecParams() throws Exception {
                 dest.fragment_instance_id = 
destParams.instanceExecParams.get(j).instanceId;
                 dest.server = 
toRpcHost(destParams.instanceExecParams.get(j).host);
                 
dest.setBrpc_server(toBrpcHost(destParams.instanceExecParams.get(j).host));
+                // select first dest as consumption transfer chain.
+                if (j == 0) {
+                    dest.setIs_transfer_chain(true);
+                } else {
+                    dest.setIs_transfer_chain(false);
+                }
                 params.destinations.add(dest);
             }
         }
@@ -1038,7 +1044,7 @@ private void getAllNodes(PlanNode plan, List<PlanNode> 
nodeList) {
         return result;
     }
 
-    public void createScanInstance(PlanNodeId leftMostScanId, 
FragmentExecParams fragmentExecParams) 
+    private void createScanInstance(PlanNodeId leftMostScanId, 
FragmentExecParams fragmentExecParams) 
          throws UserException {
         int maxNumInstance = queryOptions.mt_dop;
         if (maxNumInstance == 0) {
@@ -1150,7 +1156,7 @@ private void validate() {
     }
     
     // create collocated instance according to inputFragments
-    public void createCollocatedInstance(FragmentExecParams 
fragmentExecParams) {
+    private void createCollocatedInstance(FragmentExecParams 
fragmentExecParams) {
         Preconditions.checkState(fragmentExecParams.inputFragments.size() >= 
1);
         final FragmentExecParams inputFragmentParams = 
fragmentExecParamsMap.get(fragmentExecParams.
                 inputFragments.get(0));
@@ -1169,7 +1175,7 @@ private TUniqueId getNextInstanceId() {
     }
     
     
-    public void createUnionInstance(FragmentExecParams fragmentExecParams) {
+    private void createUnionInstance(FragmentExecParams fragmentExecParams) {
         final PlanFragment fragment = fragmentExecParams.fragment;
         // Add hosts of scan nodes
         List<PlanNodeId> scanNodeIds = findScanNodes(fragment.getPlanRoot());
@@ -1563,7 +1569,6 @@ public FragmentExecParams(PlanFragment fragment) {
                 params.setResource_info(tResourceInfo);
                 params.params.setQuery_id(queryId);
                 
params.params.setFragment_instance_id(instanceExecParam.instanceId);
-                
                 Map<Integer, List<TScanRangeParams>> scanRanges = 
instanceExecParam.perNodeScanRanges;
                 if (scanRanges == null) {
                     scanRanges = Maps.newHashMap();
diff --git a/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java
index acb87f13..101fa70d 100644
--- a/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -58,11 +58,11 @@ public ResultReceiver(TUniqueId tid, Long backendId, 
TNetworkAddress address, in
         this.timeoutTs = System.currentTimeMillis() + timeoutMs;
     }
 
-    public TResultBatch getNext(Status status) throws TException {
+    public RowBatch getNext(Status status) throws TException {
         if (isDone) {
             return null;
         }
-        
+        final RowBatch rowBatch = new RowBatch();
         try {
             while (!isDone && !isCancel) {
                 PFetchDataRequest request = new PFetchDataRequest(finstId);
@@ -90,6 +90,10 @@ public TResultBatch getNext(Status status) throws TException 
{
                 if (code != TStatusCode.OK) {
                     status.setPstatus(pResult.status);
                     return null;
+                } 
+ 
+                if (pResult.queryConsumption != null) {
+                    rowBatch.setQueryConsumption(pResult.queryConsumption);
                 }
 
                 if (packetIdx != pResult.packetSeq) {
@@ -106,7 +110,9 @@ public TResultBatch getNext(Status status) throws 
TException {
                     TResultBatch resultBatch = new TResultBatch();
                     TDeserializer deserializer = new TDeserializer();
                     deserializer.deserialize(resultBatch, serialResult);
-                    return resultBatch;
+                    rowBatch.setBatch(resultBatch);
+                    rowBatch.setEos(pResult.eos);
+                    return rowBatch;
                 }
             }
         } catch (RpcException e) {
@@ -134,7 +140,7 @@ public TResultBatch getNext(Status status) throws 
TException {
         if (isCancel) {
             status.setStatus(Status.CANCELLED);
         }
-        return null;
+        return rowBatch;
     }
 
     public void cancel() {
diff --git a/fe/src/main/java/org/apache/doris/qe/RowBatch.java 
b/fe/src/main/java/org/apache/doris/qe/RowBatch.java
new file mode 100644
index 00000000..c86d9331
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/qe/RowBatch.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.rpc.PQueryConsumption;
+import org.apache.doris.thrift.TResultBatch;
+
+public final class RowBatch {
+    private TResultBatch batch;
+    private PQueryConsumption queryConsumption;
+    private boolean eos;
+
+    public RowBatch() {
+        eos = true;
+    }
+
+    public TResultBatch getBatch() {
+        return batch;
+    }
+
+    public void setBatch(TResultBatch batch) {
+        this.batch = batch;
+    }
+
+    public PQueryConsumption getQueryConsumption() {
+        return queryConsumption;
+    }
+
+    public void setQueryConsumption(PQueryConsumption queryConsumption) {
+        this.queryConsumption = queryConsumption;
+    }
+
+    public boolean isEos() {
+        return eos;
+    }
+
+    public void setEos(boolean eos) {
+        this.eos = eos;
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 53b6c023..32528dc5 100644
--- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.qe;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -52,6 +53,7 @@
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.NotImplementedException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ProfileManager;
@@ -63,6 +65,7 @@
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.rewrite.ExprRewriter;
+import org.apache.doris.rpc.PQueryConsumption;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TQueryOptions;
@@ -75,6 +78,7 @@
 import java.io.IOException;
 import java.io.StringReader;
 import java.nio.ByteBuffer;
+import java.util.Formatter;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -101,6 +105,7 @@
     private Planner planner;
     private boolean isProxy;
     private ShowResultSet proxyResultSet = null;
+    private QueryConsumption consumptionForAuditLog;
 
     public StmtExecutor(ConnectContext context, String stmt, boolean isProxy) {
         this.context = context;
@@ -537,26 +542,34 @@ private void handleQueryStmt() throws Exception {
         // so We need to send fields after first batch arrived
 
         // send result
-        TResultBatch batch;
+        RowBatch batch;
         MysqlChannel channel = context.getMysqlChannel();
         boolean isSendFields = false;
-        while ((batch = coord.getNext()) != null) {
+      
+        while ((batch = coord.getNext()) != null && !batch.isEos()) {
             if (!isSendFields) {
                 sendFields(queryStmt.getColLabels(), 
queryStmt.getResultExprs());
             }
             isSendFields = true;
-
-            for (ByteBuffer row : batch.getRows()) {
+            for (ByteBuffer row : batch.getBatch().getRows()) {
                 channel.sendOnePacket(row);
             }
-            context.updateReturnRows(batch.getRows().size());
+            context.updateReturnRows(batch.getBatch().getRows().size());
         }
+        setConsumptionForAuditLog(batch);
         if (!isSendFields) {
             sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs());
         }
         context.getState().setEof();
     }
 
+    private void setConsumptionForAuditLog(RowBatch batch) {
+        if (batch != null) {
+            final PQueryConsumption queryConsumption = 
batch.getQueryConsumption();
+            consumptionForAuditLog = new 
QueryConsumption(queryConsumption.cpu, queryConsumption.io);
+        }
+    }
+
     // Process a select statement.
     private void handleInsertStmt() throws Exception {
         // Every time set no send flag and clean all data in buffer
@@ -774,4 +787,40 @@ private void handleExportStmt() throws Exception {
         ExportStmt exportStmt = (ExportStmt) parsedStmt;
         context.getCatalog().getExportMgr().addExportJob(exportStmt);
     }
+
+    public QueryConsumption getQueryConsumptionForAuditLog() {
+        if (consumptionForAuditLog == null) {
+            consumptionForAuditLog = new QueryConsumption();
+        }
+        return consumptionForAuditLog;
+    }
+
+    public static class QueryConsumption {
+        private final long cpu;
+        private final long io;
+
+        public QueryConsumption() {
+            this.cpu = 0;
+            this.io = 0;
+        }
+
+        public QueryConsumption(long cpu, long io) {
+            this.cpu = cpu;
+            this.io = io;
+        }
+
+        public String getFormattingCpuConsumption() {
+            final StringBuilder builder = new StringBuilder();
+            builder.append(cpu).append(" Rows");
+            return builder.toString();
+        }
+
+        public String getFormattingIoConsumption() {
+            final Pair<Double, String> pair = DebugUtil.getByteUint(io);
+            final Formatter fmt = new Formatter();
+            final StringBuilder builder = new StringBuilder();
+            builder.append(fmt.format("%.2f", pair.first)).append(" 
").append(pair.second);
+            return builder.toString();
+        }
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java 
b/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java
index cd4af477..2b1406a8 100644
--- a/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java
+++ b/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java
@@ -28,4 +28,6 @@
     public long packetSeq;
     @Protobuf(order = 3, required = false)
     public boolean eos;
+    @Protobuf(order = 4, required = false)
+    public PQueryConsumption queryConsumption;
 }
diff --git a/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java 
b/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java
new file mode 100644
index 00000000..3b233914
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import com.baidu.bjf.remoting.protobuf.annotation.Protobuf;
+import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass;
+
+@ProtobufClass
+public class PQueryConsumption {
+    @Protobuf(order = 1, required = false)
+    public long cpu;
+    @Protobuf(order = 2, required = false)
+    public long io;
+}
diff --git a/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java 
b/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
index e3e4a573..d580159d 100644
--- a/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
+++ b/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java
@@ -231,6 +231,7 @@ public void testQuery() throws Exception {
         // Mock statement executor
         StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class);
         qe.execute();
+        EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new 
StmtExecutor.QueryConsumption());
         EasyMock.expectLastCall().anyTimes();
         EasyMock.replay(qe);
         PowerMock.expectNew(
@@ -254,11 +255,11 @@ public void testQueryFail() throws Exception {
         StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class);
         qe.execute();
         EasyMock.expectLastCall().andThrow(new IOException("Fail")).anyTimes();
+        EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new 
StmtExecutor.QueryConsumption());
         EasyMock.replay(qe);
         PowerMock.expectNew(StmtExecutor.class, 
EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class))
                 .andReturn(qe).anyTimes();
         PowerMock.replay(StmtExecutor.class);
-
         processor.processOnce();
         Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand());
     }
@@ -272,6 +273,7 @@ public void testQueryFail2() throws Exception {
         // Mock statement executor
         StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class);
         qe.execute();
+        EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new 
StmtExecutor.QueryConsumption());
         EasyMock.expectLastCall().andThrow(new 
NullPointerException("Fail")).anyTimes();
         EasyMock.replay(qe);
         PowerMock.expectNew(StmtExecutor.class, 
EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class))
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index bec67edb..8df74801 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -19,6 +19,11 @@ syntax="proto2";
 
 package doris;
 
+message PQueryConsumption {
+    optional int64 cpu = 1;
+    optional int64 io = 2;
+}
+
 message PRowBatch {
     required int32 num_rows = 1;
     repeated int32 row_tuples = 2;
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 71782934..daad8a62 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -41,6 +41,8 @@ message PTransmitDataParams {
     optional PRowBatch row_batch = 6;
     // different per packet
     required int64 packet_seq = 7;
+    // query consumption
+    optional PQueryConsumption query_consumption = 8;
 };
 
 message PTransmitDataResult {
@@ -129,6 +131,7 @@ message PFetchDataResult {
     // valid when status is ok
     optional int64 packet_seq = 2;
     optional bool eos = 3;
+    optional PQueryConsumption query_consumption = 4;
 };
 
 message PTriggerProfileReportRequest {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 4bc4d5ee..77db297f 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -142,6 +142,7 @@ struct TPlanFragmentDestination {
   // ... which is being executed on this server
   2: required Types.TNetworkAddress server
   3: optional Types.TNetworkAddress brpc_server
+  4: optional bool is_transfer_chain
 }
 
 // Parameters for a single execution instance of a particular TPlanFragment


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to