This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new f254870 [Optimize] Put _Tuple_ptrs into mempool when RowBatch is initialized (#6036) f254870 is described below commit f254870aeb18752a786586ef5d7ccf952b97f895 Author: Xinyi Zou <zouxinyi...@foxmail.com> AuthorDate: Wed Jun 30 09:27:53 2021 +0800 [Optimize] Put _Tuple_ptrs into mempool when RowBatch is initialized (#6036) --- be/src/runtime/row_batch.cpp | 44 ++++---------------------------------------- be/src/runtime/row_batch.h | 21 ++++++--------------- 2 files changed, 10 insertions(+), 55 deletions(-) diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 461a389..f0238fa 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -55,13 +55,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_ _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - if (config::enable_partitioned_aggregation) { - _mem_tracker->Consume(_tuple_ptrs_size); - _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size)); - DCHECK(_tuple_ptrs != NULL); - } else { - _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size)); - } + _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size)); } // TODO: we want our input_batch's tuple_data to come from our (not yet implemented) @@ -87,13 +81,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - if (config::enable_partitioned_aggregation) { - _mem_tracker->Consume(_tuple_ptrs_size); - _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size)); - DCHECK(_tuple_ptrs != nullptr); - } else { - _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size)); - } + _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size)); uint8_t* tuple_data = nullptr; if (input_batch.is_compressed()) { @@ -185,13 +173,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); // TODO: switch to Init() pattern so we can check memory limit and return Status. - if (config::enable_partitioned_aggregation) { - _mem_tracker->Consume(_tuple_ptrs_size); - _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size)); - DCHECK(_tuple_ptrs != NULL); - } else { - _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size)); - } + _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size)); uint8_t* tuple_data = NULL; if (input_batch.is_compressed) { @@ -285,12 +267,6 @@ void RowBatch::clear() { for (int i = 0; i < _blocks.size(); ++i) { _blocks[i]->del(); } - if (config::enable_partitioned_aggregation) { - DCHECK(_tuple_ptrs != NULL); - free(_tuple_ptrs); - _mem_tracker->Release(_tuple_ptrs_size); - _tuple_ptrs = NULL; - } _cleared = true; } @@ -474,8 +450,6 @@ void RowBatch::reset() { _capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*)); _has_in_flight_row = false; - // TODO: Change this to Clear() and investigate the repercussions. - _tuple_data_pool->free_all(); _agg_object_pool.reset(new ObjectPool()); for (int i = 0; i < _io_buffers.size(); ++i) { _io_buffers[i]->return_buffer(); @@ -493,9 +467,6 @@ void RowBatch::reset() { } _blocks.clear(); _auxiliary_mem_usage = 0; - if (!config::enable_partitioned_aggregation) { - _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size)); - } _need_to_return = false; _flush = FlushMode::NO_FLUSH_RESOURCES; _needs_deep_copy = false; @@ -593,14 +564,7 @@ void RowBatch::acquire_state(RowBatch* src) { _num_rows = src->_num_rows; _capacity = src->_capacity; _need_to_return = src->_need_to_return; - if (!config::enable_partitioned_aggregation) { - // Tuple pointers are allocated from tuple_data_pool_ so are transferred. - _tuple_ptrs = src->_tuple_ptrs; - src->_tuple_ptrs = NULL; - } else { - // tuple_ptrs_ were allocated with malloc so can be swapped between batches. - std::swap(_tuple_ptrs, src->_tuple_ptrs); - } + std::swap(_tuple_ptrs, src->_tuple_ptrs); src->transfer_resource_ownership(this); } diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 66e2f69..4ff6624 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -70,7 +70,6 @@ class PRowBatch; // // A row batch is considered at capacity if all the rows are full or it has accumulated // auxiliary memory up to a soft cap. (See _at_capacity_mem_usage comment). -// TODO: stick _tuple_ptrs into a pool? class RowBatch : public RowBatchInterface { public: /// Flag indicating whether the resources attached to a RowBatch need to be flushed. @@ -414,22 +413,14 @@ private: int _num_tuples_per_row; RowDescriptor _row_desc; - // Array of pointers with _capacity * _num_tuples_per_row elements. - // The memory ownership depends on whether legacy joins and aggs are enabled. + // Memory is allocated from MemPool, need to investigate the repercussions. // - // Memory is malloc'd and owned by RowBatch: - // If enable_partitioned_hash_join=true and enable_partitioned_aggregation=true - // then the memory is owned by this RowBatch and is freed upon its destruction. - // This mode is more performant especially with SubplanNodes in the ExecNode tree - // because the tuple pointers are not transferred and do not have to be re-created - // in every Reset(). + // In the past, there were malloc'd and MemPool memory allocation methods. + // Malloc'd memory belongs to RowBatch itself, and the latter belongs to MemPool management. + // The memory ownership depends on whether legacy joins and aggs are enabled. // - // Memory is allocated from MemPool: - // Otherwise, the memory is allocated from _tuple_data_pool. As a result, the - // pointer memory is transferred just like tuple data, and must be re-created - // in Reset(). This mode is required for the legacy join and agg which rely on - // the tuple pointers being allocated from the _tuple_data_pool, so they can - // acquire ownership of the tuple pointers. + // At present, it is allocated from MemPool uniformly, and tuple pointers are not transferred + // and do not have to be re-created in every Reset(), which has better performance. Tuple** _tuple_ptrs; int _tuple_ptrs_size; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org