This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f254870  [Optimize] Put _Tuple_ptrs into mempool when RowBatch is 
initialized (#6036)
f254870 is described below

commit f254870aeb18752a786586ef5d7ccf952b97f895
Author: Xinyi Zou <zouxinyi...@foxmail.com>
AuthorDate: Wed Jun 30 09:27:53 2021 +0800

    [Optimize] Put _Tuple_ptrs into mempool when RowBatch is initialized (#6036)
---
 be/src/runtime/row_batch.cpp | 44 ++++----------------------------------------
 be/src/runtime/row_batch.h   | 21 ++++++---------------
 2 files changed, 10 insertions(+), 55 deletions(-)

diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 461a389..f0238fa 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -55,13 +55,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int 
capacity, MemTracker* mem_
     _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
     DCHECK_GT(_tuple_ptrs_size, 0);
     // TODO: switch to Init() pattern so we can check memory limit and return 
Status.
-    if (config::enable_partitioned_aggregation) {
-        _mem_tracker->Consume(_tuple_ptrs_size);
-        _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
-        DCHECK(_tuple_ptrs != NULL);
-    } else {
-        _tuple_ptrs = 
reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
-    }
+    _tuple_ptrs = 
reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
 }
 
 // TODO: we want our input_batch's tuple_data to come from our (not yet 
implemented)
@@ -87,13 +81,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const 
PRowBatch& input_batch,
     _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
     DCHECK_GT(_tuple_ptrs_size, 0);
     // TODO: switch to Init() pattern so we can check memory limit and return 
Status.
-    if (config::enable_partitioned_aggregation) {
-        _mem_tracker->Consume(_tuple_ptrs_size);
-        _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
-        DCHECK(_tuple_ptrs != nullptr);
-    } else {
-        _tuple_ptrs = 
reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
-    }
+    _tuple_ptrs = 
reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
 
     uint8_t* tuple_data = nullptr;
     if (input_batch.is_compressed()) {
@@ -185,13 +173,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const 
TRowBatch& input_batch,
     _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * 
sizeof(Tuple*);
     DCHECK_GT(_tuple_ptrs_size, 0);
     // TODO: switch to Init() pattern so we can check memory limit and return 
Status.
-    if (config::enable_partitioned_aggregation) {
-        _mem_tracker->Consume(_tuple_ptrs_size);
-        _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
-        DCHECK(_tuple_ptrs != NULL);
-    } else {
-        _tuple_ptrs = 
reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
-    }
+    _tuple_ptrs = 
reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
 
     uint8_t* tuple_data = NULL;
     if (input_batch.is_compressed) {
@@ -285,12 +267,6 @@ void RowBatch::clear() {
     for (int i = 0; i < _blocks.size(); ++i) {
         _blocks[i]->del();
     }
-    if (config::enable_partitioned_aggregation) {
-        DCHECK(_tuple_ptrs != NULL);
-        free(_tuple_ptrs);
-        _mem_tracker->Release(_tuple_ptrs_size);
-        _tuple_ptrs = NULL;
-    }
     _cleared = true;
 }
 
@@ -474,8 +450,6 @@ void RowBatch::reset() {
     _capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*));
     _has_in_flight_row = false;
 
-    // TODO: Change this to Clear() and investigate the repercussions.
-    _tuple_data_pool->free_all();
     _agg_object_pool.reset(new ObjectPool());
     for (int i = 0; i < _io_buffers.size(); ++i) {
         _io_buffers[i]->return_buffer();
@@ -493,9 +467,6 @@ void RowBatch::reset() {
     }
     _blocks.clear();
     _auxiliary_mem_usage = 0;
-    if (!config::enable_partitioned_aggregation) {
-        _tuple_ptrs = 
reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
-    }
     _need_to_return = false;
     _flush = FlushMode::NO_FLUSH_RESOURCES;
     _needs_deep_copy = false;
@@ -593,14 +564,7 @@ void RowBatch::acquire_state(RowBatch* src) {
     _num_rows = src->_num_rows;
     _capacity = src->_capacity;
     _need_to_return = src->_need_to_return;
-    if (!config::enable_partitioned_aggregation) {
-        // Tuple pointers are allocated from tuple_data_pool_ so are 
transferred.
-        _tuple_ptrs = src->_tuple_ptrs;
-        src->_tuple_ptrs = NULL;
-    } else {
-        // tuple_ptrs_ were allocated with malloc so can be swapped between 
batches.
-        std::swap(_tuple_ptrs, src->_tuple_ptrs);
-    }
+    std::swap(_tuple_ptrs, src->_tuple_ptrs);
     src->transfer_resource_ownership(this);
 }
 
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 66e2f69..4ff6624 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -70,7 +70,6 @@ class PRowBatch;
 //
 // A row batch is considered at capacity if all the rows are full or it has 
accumulated
 // auxiliary memory up to a soft cap. (See _at_capacity_mem_usage comment).
-// TODO: stick _tuple_ptrs into a pool?
 class RowBatch : public RowBatchInterface {
 public:
     /// Flag indicating whether the resources attached to a RowBatch need to 
be flushed.
@@ -414,22 +413,14 @@ private:
     int _num_tuples_per_row;
     RowDescriptor _row_desc;
 
-    // Array of pointers with _capacity * _num_tuples_per_row elements.
-    // The memory ownership depends on whether legacy joins and aggs are 
enabled.
+    // Memory is allocated from MemPool, need to investigate the repercussions.
     //
-    // Memory is malloc'd and owned by RowBatch:
-    // If enable_partitioned_hash_join=true and 
enable_partitioned_aggregation=true
-    // then the memory is owned by this RowBatch and is freed upon its 
destruction.
-    // This mode is more performant especially with SubplanNodes in the 
ExecNode tree
-    // because the tuple pointers are not transferred and do not have to be 
re-created
-    // in every Reset().
+    // In the past, there were malloc'd and MemPool memory allocation methods. 
+    // Malloc'd memory belongs to RowBatch itself, and the latter belongs to 
MemPool management. 
+    // The memory ownership depends on whether legacy joins and aggs are 
enabled.
     //
-    // Memory is allocated from MemPool:
-    // Otherwise, the memory is allocated from _tuple_data_pool. As a result, 
the
-    // pointer memory is transferred just like tuple data, and must be 
re-created
-    // in Reset(). This mode is required for the legacy join and agg which 
rely on
-    // the tuple pointers being allocated from the _tuple_data_pool, so they 
can
-    // acquire ownership of the tuple pointers.
+    // At present, it is allocated from MemPool uniformly, and tuple pointers 
are not transferred 
+    // and do not have to be re-created in every Reset(), which has better 
performance.
     Tuple** _tuple_ptrs;
     int _tuple_ptrs_size;
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to