This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fed34f  [optimize] provide a better defer operator (#5706)
5fed34f is described below

commit 5fed34fcfe296e312653a3a30afffc5a13b9e854
Author: stdpain <[email protected]>
AuthorDate: Wed May 12 10:37:23 2021 +0800

    [optimize] provide a better defer operator (#5706)
---
 be/src/olap/schema_change.cpp                      | 67 +++++++++++++---------
 be/src/runtime/routine_load/data_consumer.cpp      | 19 +++---
 .../routine_load/routine_load_task_executor.cpp    |  5 +-
 be/src/runtime/sorted_run_merger.cc                |  2 +-
 be/src/util/defer_op.h                             | 19 ++++--
 be/src/util/file_utils.cpp                         |  8 +--
 be/src/util/mysql_load_error_hub.cpp               |  2 +-
 7 files changed, 68 insertions(+), 54 deletions(-)

diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 43d3a2a..e738897 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -74,8 +74,8 @@ public:
     explicit RowBlockMerger(TabletSharedPtr tablet);
     virtual ~RowBlockMerger();
 
-    bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* 
rowset_writer, std::shared_ptr<MemTracker> parent,
-               uint64_t* merged_rows);
+    bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* 
rowset_writer,
+               std::shared_ptr<MemTracker> parent, uint64_t* merged_rows);
 
 private:
     struct MergeElement {
@@ -713,7 +713,8 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
     return true;
 }
 
-RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, 
std::shared_ptr<MemTracker> parent, size_t memory_limitation)
+RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema,
+                                     std::shared_ptr<MemTracker> parent, 
size_t memory_limitation)
         : _tablet_schema(tablet_schema),
           _mem_tracker(MemTracker::CreateTracker(-1, "RowBlockAllocator", 
parent, false)),
           _row_len(tablet_schema.row_size()),
@@ -723,16 +724,18 @@ RowBlockAllocator::RowBlockAllocator(const TabletSchema& 
tablet_schema, std::sha
 
 RowBlockAllocator::~RowBlockAllocator() {
     if (_mem_tracker->consumption() != 0) {
-        LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size=" << 
_mem_tracker->consumption();
+        LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size="
+                     << _mem_tracker->consumption();
     }
 }
 
 OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, 
bool null_supported) {
     size_t row_block_size = _row_len * num_rows;
 
-    if (_memory_limitation > 0 && _mem_tracker->consumption() + row_block_size 
> _memory_limitation) {
+    if (_memory_limitation > 0 &&
+        _mem_tracker->consumption() + row_block_size > _memory_limitation) {
         LOG(WARNING) << "RowBlockAllocator::alocate() memory exceeded. "
-                    << "m_memory_allocated=" << _mem_tracker->consumption();
+                     << "m_memory_allocated=" << _mem_tracker->consumption();
         *row_block = nullptr;
         return OLAP_SUCCESS;
     }
@@ -751,7 +754,8 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** 
row_block, size_t num_rows, bo
 
     _mem_tracker->Consume(row_block_size);
     VLOG_NOTICE << "RowBlockAllocator::allocate() this=" << this << ", 
num_rows=" << num_rows
-            << ", m_memory_allocated=" << _mem_tracker->consumption() << ", 
row_block_addr=" << *row_block;
+                << ", m_memory_allocated=" << _mem_tracker->consumption()
+                << ", row_block_addr=" << *row_block;
     return OLAP_SUCCESS;
 }
 
@@ -765,7 +769,8 @@ void RowBlockAllocator::release(RowBlock* row_block) {
 
     VLOG_NOTICE << "RowBlockAllocator::release() this=" << this
                 << ", num_rows=" << row_block->capacity()
-                << ", m_memory_allocated=" << _mem_tracker->consumption() << 
", row_block_addr=" << row_block;
+                << ", m_memory_allocated=" << _mem_tracker->consumption()
+                << ", row_block_addr=" << row_block;
     delete row_block;
 }
 
@@ -773,11 +778,12 @@ RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : 
_tablet(tablet) {}
 
 RowBlockMerger::~RowBlockMerger() {}
 
-bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, 
RowsetWriter* rowset_writer, std::shared_ptr<MemTracker> parent,
-                           uint64_t* merged_rows) {
+bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, 
RowsetWriter* rowset_writer,
+                           std::shared_ptr<MemTracker> parent, uint64_t* 
merged_rows) {
     uint64_t tmp_merged_rows = 0;
     RowCursor row_cursor;
-    std::shared_ptr<MemTracker> tracker(MemTracker::CreateTracker(-1, 
"RowBlockMerger", parent, false));
+    std::shared_ptr<MemTracker> tracker(
+            MemTracker::CreateTracker(-1, "RowBlockMerger", parent, false));
     std::unique_ptr<MemPool> mem_pool(new MemPool(tracker.get()));
     std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
     if (row_cursor.init(_tablet->tablet_schema()) != OLAP_SUCCESS) {
@@ -898,8 +904,12 @@ OLAPStatus 
LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
     return status;
 }
 
-SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& 
row_block_changer, std::shared_ptr<MemTracker> mem_tracker)
-        : SchemaChange(mem_tracker), _row_block_changer(row_block_changer), 
_row_block_allocator(nullptr), _cursor(nullptr) {}
+SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& 
row_block_changer,
+                                           std::shared_ptr<MemTracker> 
mem_tracker)
+        : SchemaChange(mem_tracker),
+          _row_block_changer(row_block_changer),
+          _row_block_allocator(nullptr),
+          _cursor(nullptr) {}
 
 SchemaChangeDirectly::~SchemaChangeDirectly() {
     VLOG_NOTICE << "~SchemaChangeDirectly()";
@@ -920,7 +930,7 @@ bool SchemaChangeDirectly::_write_row_block(RowsetWriter* 
rowset_writer, RowBloc
 }
 
 OLAPStatus reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>* 
block_handle_ptr, int row_num,
-                     RowBlockAllocator* allocator) {
+                         RowBlockAllocator* allocator) {
     auto& block_handle = *block_handle_ptr;
     if (block_handle == nullptr || block_handle->capacity() < row_num) {
         // release old block and alloc new block
@@ -1010,7 +1020,7 @@ OLAPStatus 
SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
         res = _row_block_changer.change_row_block(ref_row_block, 
rowset_reader->version().second,
                                                   new_row_block.get(), 
&filtered_rows);
         RETURN_NOT_OK_LOG(res, "failed to change data in row block.");
-        
+
         // rows filtered by delete handler one by one
         add_filtered_rows(filtered_rows);
 
@@ -1053,7 +1063,8 @@ OLAPStatus 
SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
     return res;
 }
 
-SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& 
row_block_changer, std::shared_ptr<MemTracker> mem_tracker,
+SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& 
row_block_changer,
+                                                 std::shared_ptr<MemTracker> 
mem_tracker,
                                                  size_t memory_limitation)
         : SchemaChange(mem_tracker),
           _row_block_changer(row_block_changer),
@@ -1078,8 +1089,8 @@ OLAPStatus 
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
                                             TabletSharedPtr new_tablet,
                                             TabletSharedPtr base_tablet) {
     if (_row_block_allocator == nullptr) {
-        _row_block_allocator =
-                new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), 
_mem_tracker, _memory_limitation);
+        _row_block_allocator = new (nothrow)
+                RowBlockAllocator(new_tablet->tablet_schema(), _mem_tracker, 
_memory_limitation);
         if (_row_block_allocator == nullptr) {
             LOG(FATAL) << "failed to malloc RowBlockAllocator. size=" << 
sizeof(RowBlockAllocator);
             return OLAP_ERR_INPUT_PARAMETER_ERROR;
@@ -1114,7 +1125,7 @@ OLAPStatus 
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
     // src_rowsets to store the rowset generated by internal sorting
     std::vector<RowsetSharedPtr> src_rowsets;
 
-    DeferOp defer([&]() {
+    Defer defer{[&]() {
         // remove the intermediate rowsets generated by internal sorting
         for (auto& row_set : src_rowsets) {
             StorageEngine::instance()->add_unused_rowset(row_set);
@@ -1125,7 +1136,7 @@ OLAPStatus 
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
         }
 
         row_block_arr.clear();
-    });
+    }};
 
     _temp_delta_versions.first = _temp_delta_versions.second;
 
@@ -1349,10 +1360,10 @@ bool 
SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
     return true;
 }
 
-SchemaChangeHandler::SchemaChangeHandler() : 
_mem_tracker(MemTracker::CreateTracker(-1, "SchemaChange")) {
-    REGISTER_HOOK_METRIC(schema_change_mem_consumption, [this]() {
-      return _mem_tracker->consumption();
-    });
+SchemaChangeHandler::SchemaChangeHandler()
+        : _mem_tracker(MemTracker::CreateTracker(-1, "SchemaChange")) {
+    REGISTER_HOOK_METRIC(schema_change_mem_consumption,
+                         [this]() { return _mem_tracker->consumption(); });
 }
 
 SchemaChangeHandler::~SchemaChangeHandler() {
@@ -1644,8 +1655,8 @@ OLAPStatus 
SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
         size_t memory_limitation = 
config::memory_limitation_per_thread_for_schema_change;
         LOG(INFO) << "doing schema change with sorting for base_tablet "
                   << base_tablet->full_name();
-        sc_procedure = new (nothrow)
-                SchemaChangeWithSorting(rb_changer, _mem_tracker, 
memory_limitation * 1024 * 1024 * 1024);
+        sc_procedure = new (nothrow) SchemaChangeWithSorting(
+                rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 
1024);
     } else if (sc_directly) {
         LOG(INFO) << "doing schema change directly for base_tablet " << 
base_tablet->full_name();
         sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer, 
_mem_tracker);
@@ -1852,8 +1863,8 @@ OLAPStatus 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
         size_t memory_limitation = 
config::memory_limitation_per_thread_for_schema_change;
         LOG(INFO) << "doing schema change with sorting for base_tablet "
                   << sc_params.base_tablet->full_name();
-        sc_procedure = new (nothrow)
-                SchemaChangeWithSorting(rb_changer, _mem_tracker, 
memory_limitation * 1024 * 1024 * 1024);
+        sc_procedure = new (nothrow) SchemaChangeWithSorting(
+                rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 * 
1024);
     } else if (sc_directly) {
         LOG(INFO) << "doing schema change directly for base_tablet "
                   << sc_params.base_tablet->full_name();
diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 157f69a..301ad24 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -44,8 +44,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
     RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 
     // conf has to be deleted finally
-    auto conf_deleter = [conf]() { delete conf; };
-    DeferOp delete_conf(std::bind<void>(conf_deleter));
+    Defer delete_conf{[conf]() { delete conf; }};
 
     std::stringstream ss;
     ss << BackendOptions::get_localhost() << "_";
@@ -146,11 +145,10 @@ Status KafkaDataConsumer::assign_topic_partitions(
               << " assign topic partitions: " << topic << ", " << ss.str();
 
     // delete TopicPartition finally
-    auto tp_deleter = [&topic_partitions]() {
+    Defer delete_tp{[&topic_partitions]() {
         std::for_each(topic_partitions.begin(), topic_partitions.end(),
                       [](RdKafka::TopicPartition* tp1) { delete tp1; });
-    };
-    DeferOp delete_tp(std::bind<void>(tp_deleter));
+    }};
 
     // assign partition
     RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
@@ -238,8 +236,7 @@ Status 
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
 Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* 
partition_ids) {
     // create topic conf
     RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
-    auto conf_deleter = [tconf]() { delete tconf; };
-    DeferOp delete_conf(std::bind<void>(conf_deleter));
+    Defer delete_conf{[tconf]() { delete tconf; }};
 
     // create topic
     std::string errstr;
@@ -250,8 +247,8 @@ Status 
KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
-    auto topic_deleter = [topic]() { delete topic; };
-    DeferOp delete_topic(std::bind<void>(topic_deleter));
+
+    Defer delete_topic{[topic]() { delete topic; }};
 
     // get topic metadata
     RdKafka::Metadata* metadata = nullptr;
@@ -263,8 +260,8 @@ Status 
KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
         LOG(WARNING) << ss.str();
         return Status::InternalError(ss.str());
     }
-    auto meta_deleter = [metadata]() { delete metadata; };
-    DeferOp delete_meta(std::bind<void>(meta_deleter));
+
+    Defer delete_meta{[metadata]() { delete metadata; }};
 
     // get partition ids
     RdKafka::Metadata::TopicMetadataIterator it;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 6d40c9d..ed0abc9 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -293,11 +293,10 @@ void 
RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
         _data_consumer_pool.return_consumer(consumer);
 
         // delete TopicPartition finally
-        auto tp_deleter = [&topic_partitions]() {
+        Defer delete_tp{[&topic_partitions]() {
             std::for_each(topic_partitions.begin(), topic_partitions.end(),
                           [](RdKafka::TopicPartition* tp1) { delete tp1; });
-        };
-        DeferOp delete_tp(std::bind<void>(tp_deleter));
+        }};
     } break;
     default:
         return;
diff --git a/be/src/runtime/sorted_run_merger.cc 
b/be/src/runtime/sorted_run_merger.cc
index 8b6f283..de5805c 100644
--- a/be/src/runtime/sorted_run_merger.cc
+++ b/be/src/runtime/sorted_run_merger.cc
@@ -188,7 +188,7 @@ private:
             // do merge from sender queue data
             _status_backup = _sorted_run(&_input_row_batch_backup);
             _backup_ready = true;
-            DeferOp defer_op([this]() { _batch_prepared_cv.notify_one(); });
+            Defer defer_op{[this]() { _batch_prepared_cv.notify_one(); }};
 
             if (!_status_backup.ok() || _input_row_batch_backup == nullptr || 
_cancel) {
                 if (!_status_backup.ok()) _input_row_batch_backup = nullptr;
diff --git a/be/src/util/defer_op.h b/be/src/util/defer_op.h
index 467ca9a..544e151 100644
--- a/be/src/util/defer_op.h
+++ b/be/src/util/defer_op.h
@@ -23,15 +23,22 @@
 namespace doris {
 
 // This class is used to defer a function when this object is deconstruct
-class DeferOp {
+// A Better Defer operator #5576
+// for C++17
+// Defer defer {[]{ call something }};
+//
+// for C++11
+// auto op = [] {};
+// Defer<decltype<op>> (op);
+template <class T>
+class Defer {
 public:
-    typedef std::function<void()> DeferFunction;
-    DeferOp(const DeferFunction& func) : _func(func) {}
-
-    ~DeferOp() { _func(); };
+    Defer(T& closure) : _closure(closure) {}
+    Defer(T&& closure) : _closure(std::move(closure)) {}
+    ~Defer() { _closure(); }
 
 private:
-    DeferFunction _func;
+    T _closure;
 };
 
 } // namespace doris
diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp
index 8825226..fd934b5 100644
--- a/be/src/util/file_utils.cpp
+++ b/be/src/util/file_utils.cpp
@@ -26,6 +26,7 @@
 #include <algorithm>
 #include <filesystem>
 #include <iomanip>
+#include <memory>
 #include <sstream>
 
 #include "env/env.h"
@@ -240,16 +241,15 @@ Status FileUtils::copy_file(const std::string& src_path, 
const std::string& dest
     }
 
     const int64_t BUF_SIZE = 8192;
-    char* buf = new char[BUF_SIZE];
-    DeferOp free_buf(std::bind<void>(std::default_delete<char[]>(), buf));
+    std::unique_ptr<char[]> buf = std::make_unique<char[]>(BUF_SIZE);
     int64_t src_length = src_file.length();
     int64_t offset = 0;
     while (src_length > 0) {
         int64_t to_read = BUF_SIZE < src_length ? BUF_SIZE : src_length;
-        if (OLAP_SUCCESS != (src_file.pread(buf, to_read, offset))) {
+        if (OLAP_SUCCESS != (src_file.pread(buf.get(), to_read, offset))) {
             return Status::InternalError("Internal Error");
         }
-        if (OLAP_SUCCESS != (dest_file.pwrite(buf, to_read, offset))) {
+        if (OLAP_SUCCESS != (dest_file.pwrite(buf.get(), to_read, offset))) {
             return Status::InternalError("Internal Error");
         }
 
diff --git a/be/src/util/mysql_load_error_hub.cpp 
b/be/src/util/mysql_load_error_hub.cpp
index 17a387c..920ceea 100644
--- a/be/src/util/mysql_load_error_hub.cpp
+++ b/be/src/util/mysql_load_error_hub.cpp
@@ -70,7 +70,7 @@ Status MysqlLoadErrorHub::write_mysql() {
         return st;
     }
 
-    DeferOp close_mysql_conn(std::bind<void>(&mysql_close, my_conn));
+    Defer close_mysql_conn{[=]() { mysql_close(my_conn); }};
 
     Status status;
     std::stringstream sql_stream;

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to