github-actions[bot] commented on code in PR #14243: URL: https://github.com/apache/doris/pull/14243#discussion_r1044053823
########## be/src/pipeline/exec/operator.cpp: ########## @@ -19,53 +19,30 @@ namespace doris::pipeline { -Operator::Operator(OperatorBuilder* operator_builder) +OperatorBase::OperatorBase(OperatorBuilderBase* operator_builder) : _operator_builder(operator_builder), _is_closed(false) {} -bool Operator::is_sink() const { +bool OperatorBase::is_sink() const { return _operator_builder->is_sink(); } -bool Operator::is_source() const { +bool OperatorBase::is_source() const { return _operator_builder->is_source(); } -Status Operator::init(ExecNode* exec_node, RuntimeState* state) { - _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); - if (exec_node) { - exec_node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); - } - return Status::OK(); -} - -Status Operator::prepare(RuntimeState* state) { - _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), - _runtime_profile.get()); - return Status::OK(); -} - -Status Operator::open(RuntimeState* state) { - return Status::OK(); -} - -Status Operator::close(RuntimeState* state) { +Status OperatorBase::close(RuntimeState* state) { Review Comment: warning: parameter 'state' is unused [misc-unused-parameters] ```suggestion Status OperatorBase::close(RuntimeState* /*state*/) { ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -33,55 +42,121 @@ FINISHED = 2 }; -// enum class SinkState : uint8_t { SINK_IDLE = 0, // can send block to sink SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block FINISHED = 2 }; //////////////// DO NOT USE THE UP State //////////////// -class OperatorBuilder; -class Operator; +class OperatorBuilderBase; +class OperatorBase; -using OperatorPtr = std::shared_ptr<Operator>; +using OperatorPtr = std::shared_ptr<OperatorBase>; using Operators = std::vector<OperatorPtr>; -class Operator { +using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>; +using OperatorBuilders = std::vector<OperatorBuilderPtr>; + +class OperatorBuilderBase { public: - explicit Operator(OperatorBuilder* operator_builder); - virtual ~Operator() = default; + OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {} + + virtual ~OperatorBuilderBase() = default; + + virtual OperatorPtr build_operator() = 0; + + virtual bool is_sink() const { return false; } + virtual bool is_source() const { return false; } + + // create the object used by all operator + virtual Status prepare(RuntimeState* state); + + // destory the object used by all operator + virtual void close(RuntimeState* state); + + std::string get_name() const { return _name; } + + RuntimeState* runtime_state() { return _state; } + + virtual const RowDescriptor& row_desc() = 0; + + int32_t id() const { return _id; } + +protected: + const int32_t _id; + const std::string _name; + + RuntimeState* _state = nullptr; + bool _is_closed = false; +}; + +template <typename NodeType> +class OperatorBuilder : public OperatorBuilderBase { +public: + OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) + : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {} + + virtual ~OperatorBuilder() = default; + + const RowDescriptor& row_desc() override { return _node->row_desc(); } + + NodeType* exec_node() const { return _node; } + +protected: + NodeType* _node; +}; + +template <typename SinkType> +class DataSinkOperatorBuilder : public OperatorBuilderBase { +public: + DataSinkOperatorBuilder(int32_t id, const std::string& name, DataSink* sink = nullptr) + : OperatorBuilderBase(id, name), _sink(reinterpret_cast<SinkType*>(sink)) {} + + virtual ~DataSinkOperatorBuilder() = default; Review Comment: warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override] ```suggestion override ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -33,55 +42,121 @@ enum class SourceState : uint8_t { FINISHED = 2 }; -// enum class SinkState : uint8_t { SINK_IDLE = 0, // can send block to sink SINK_BUSY = 1, // sink buffer is full, should wait sink to send some block FINISHED = 2 }; //////////////// DO NOT USE THE UP State //////////////// -class OperatorBuilder; -class Operator; +class OperatorBuilderBase; +class OperatorBase; -using OperatorPtr = std::shared_ptr<Operator>; +using OperatorPtr = std::shared_ptr<OperatorBase>; using Operators = std::vector<OperatorPtr>; -class Operator { +using OperatorBuilderPtr = std::shared_ptr<OperatorBuilderBase>; +using OperatorBuilders = std::vector<OperatorBuilderPtr>; + +class OperatorBuilderBase { public: - explicit Operator(OperatorBuilder* operator_builder); - virtual ~Operator() = default; + OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {} + + virtual ~OperatorBuilderBase() = default; + + virtual OperatorPtr build_operator() = 0; + + virtual bool is_sink() const { return false; } + virtual bool is_source() const { return false; } + + // create the object used by all operator + virtual Status prepare(RuntimeState* state); + + // destory the object used by all operator + virtual void close(RuntimeState* state); + + std::string get_name() const { return _name; } + + RuntimeState* runtime_state() { return _state; } + + virtual const RowDescriptor& row_desc() = 0; + + int32_t id() const { return _id; } + +protected: + const int32_t _id; + const std::string _name; + + RuntimeState* _state = nullptr; + bool _is_closed = false; +}; + +template <typename NodeType> +class OperatorBuilder : public OperatorBuilderBase { +public: + OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) + : OperatorBuilderBase(id, name), _node(reinterpret_cast<NodeType*>(exec_node)) {} + + virtual ~OperatorBuilder() = default; Review Comment: warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override] ```suggestion override ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -96,20 +171,14 @@ virtual bool can_write() { return false; } // for sink // for pipeline - virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state, - [[maybe_unused]] vectorized::Block* block, - [[maybe_unused]] SourceState& result_state) { - std::stringstream error_msg; - error_msg << " has not implements get_block"; - return Status::NotSupported(error_msg.str()); - } + virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block, + SourceState& result_state) { Review Comment: warning: parameter 'block' is unused [misc-unused-parameters] ```suggestion /*block*/, ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -96,20 +171,14 @@ virtual bool can_write() { return false; } // for sink // for pipeline - virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state, - [[maybe_unused]] vectorized::Block* block, - [[maybe_unused]] SourceState& result_state) { - std::stringstream error_msg; - error_msg << " has not implements get_block"; - return Status::NotSupported(error_msg.str()); - } + virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block, + SourceState& result_state) { + return Status::OK(); Review Comment: warning: parameter 'result_state' is unused [misc-unused-parameters] ```suggestion block, tate) { /*result_state*/ ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion fault; ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(_node->alloc_resource(state)); + return Status::OK(); + } + + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(_node->alloc_resource(state)); + return Status::OK(); + } + + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _node->sink(state, in_block, source_state == SourceState::FINISHED); + } + + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_node); + if (!_node->decrease_ref()) { + _node->release_resource(state); + } + return Status::OK(); + } + + virtual Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + bool eos = false; + RETURN_IF_ERROR(_node->pull(state, block, &eos)); + source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; + return Status::OK(); + } + + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } Review Comment: warning: parameter 'state' is unused [misc-unused-parameters] ```suggestion /*state*/ ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(_node->alloc_resource(state)); + return Status::OK(); + } + + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _node->sink(state, in_block, source_state == SourceState::FINISHED); + } + + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_node); + if (!_node->decrease_ref()) { + _node->release_resource(state); + } + return Status::OK(); + } + + virtual Status get_block(RuntimeState* state, vectorized::Block* block, Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(_node->alloc_resource(state)); + return Status::OK(); + } + + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _node->sink(state, in_block, source_state == SourceState::FINISHED); + } + + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_node); + if (!_node->decrease_ref()) { + _node->release_resource(state); + } + return Status::OK(); + } + + virtual Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + bool eos = false; + RETURN_IF_ERROR(_node->pull(state, block, &eos)); + source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; + return Status::OK(); + } + + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } + + virtual bool can_read() override { return _node->can_read(); } protected: - const int32_t _id; - const std::string _name; - ExecNode* _related_exec_node; + void _fresh_exec_timer(NodeType* node) { + node->runtime_profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - RuntimeState* _state = nullptr; - bool _is_closed = false; + NodeType* _node; }; -using OperatorBuilderPtr = std::shared_ptr<OperatorBuilder>; -using OperatorBuilders = std::vector<OperatorBuilderPtr>; +template <typename OperatorBuilderType> +class DataStateOperator : public Operator<OperatorBuilderType> { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + DataStateOperator(OperatorBuilderBase* builder, ExecNode* node) + : Operator<OperatorBuilderType>(builder, node), + _child_block(new vectorized::Block), + _child_source_state(SourceState::DEPEND_ON_SOURCE) {}; + + virtual ~DataStateOperator() = default; + + virtual Status get_block(RuntimeState* state, vectorized::Block* block, Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion fault; ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; Review Comment: warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override] ```suggestion )) {};override ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -96,20 +171,14 @@ virtual bool can_write() { return false; } // for sink // for pipeline - virtual Status get_block([[maybe_unused]] RuntimeState* runtime_state, - [[maybe_unused]] vectorized::Block* block, - [[maybe_unused]] SourceState& result_state) { - std::stringstream error_msg; - error_msg << " has not implements get_block"; - return Status::NotSupported(error_msg.str()); - } + virtual Status get_block(RuntimeState* runtime_state, vectorized::Block* block, Review Comment: warning: parameter 'runtime_state' is unused [misc-unused-parameters] ```suggestion ipeline /*runtime_state*/ ``` ########## be/src/pipeline/exec/operator.cpp: ########## @@ -75,13 +52,13 @@ /////////////////////////////////////// OperatorBuilder //////////////////////////////////////////////////////////// -Status OperatorBuilder::prepare(doris::RuntimeState* state) { +Status OperatorBuilderBase::prepare(doris::RuntimeState* state) { _state = state; // runtime filter, now dispose by NewOlapScanNode return Status::OK(); } -void OperatorBuilder::close(doris::RuntimeState* state) { +void OperatorBuilderBase::close(doris::RuntimeState* state) { Review Comment: warning: parameter 'state' is unused [misc-unused-parameters] ```suggestion void OperatorBuilderBase::close(doris::RuntimeState* /*state*/) { ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; Review Comment: warning: prefer using 'override' or (rarely) 'final' instead of 'virtual' [modernize-use-override] ```suggestion )) {};override ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { Review Comment: warning: parameter 'state' is unused [misc-unused-parameters] ```suggestion fault; /*state*/ ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } Review Comment: warning: parameter 'state' is unused [misc-unused-parameters] ```suggestion /*state*/ ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(_node->alloc_resource(state)); + return Status::OK(); + } + + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _node->sink(state, in_block, source_state == SourceState::FINISHED); + } + + virtual Status close(RuntimeState* state) override { Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(_node->alloc_resource(state)); + return Status::OK(); + } + + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _node->sink(state, in_block, source_state == SourceState::FINISHED); + } + + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_node); + if (!_node->decrease_ref()) { + _node->release_resource(state); + } + return Status::OK(); + } + + virtual Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + bool eos = false; + RETURN_IF_ERROR(_node->pull(state, block, &eos)); + source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; + return Status::OK(); + } + + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _node->runtime_profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + _node->increase_ref(); + return Status::OK(); + } + + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(_node->alloc_resource(state)); + return Status::OK(); + } + + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _node->sink(state, in_block, source_state == SourceState::FINISHED); + } + + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_node); + if (!_node->decrease_ref()) { + _node->release_resource(state); + } + return Status::OK(); + } + + virtual Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + bool eos = false; + RETURN_IF_ERROR(_node->pull(state, block, &eos)); + source_state = eos ? SourceState::FINISHED : SourceState::DEPEND_ON_SOURCE; + return Status::OK(); + } + + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } + + virtual bool can_read() override { return _node->can_read(); } Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion K(); } ``` ########## be/src/pipeline/exec/operator.h: ########## @@ -155,44 +222,163 @@ bool _is_closed = false; }; -class OperatorBuilder { +template <typename OperatorBuilderType> +class DataSinkOperator : public OperatorBase { public: - OperatorBuilder(int32_t id, const std::string& name, ExecNode* exec_node = nullptr) - : _id(id), _name(name), _related_exec_node(exec_node) {} + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; - virtual ~OperatorBuilder() = default; + DataSinkOperator(OperatorBuilderBase* builder, DataSink* sink) + : OperatorBase(builder), _sink(reinterpret_cast<NodeType*>(sink)) {}; - virtual OperatorPtr build_operator() = 0; + virtual ~DataSinkOperator() = default; - virtual bool is_sink() const { return false; } - virtual bool is_source() const { return false; } + virtual Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_sink->prepare(state)); + _runtime_profile.reset(new RuntimeProfile(_operator_builder->get_name())); + _sink->profile()->insert_child_head(_runtime_profile.get(), true); + _mem_tracker = std::make_unique<MemTracker>("Operator:" + _runtime_profile->name(), + _runtime_profile.get()); + return Status::OK(); + } - // create the object used by all operator - virtual Status prepare(RuntimeState* state); + virtual Status open(RuntimeState* state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + return _sink->open(state); + } - // destory the object used by all operator - virtual void close(RuntimeState* state); + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + if (!UNLIKELY(in_block)) { + DCHECK(source_state == SourceState::FINISHED) + << "block is null, eos should invoke in finalize."; + return Status::OK(); + } + return _sink->send(state, in_block, source_state == SourceState::FINISHED); + } - std::string get_name() const { return _name; } + virtual Status close(RuntimeState* state) override { + _fresh_exec_timer(_sink); + return _sink->close(state, Status::OK()); + } - RuntimeState* runtime_state() { return _state; } + virtual Status finalize(RuntimeState* state) override { return Status::OK(); } - const RowDescriptor& row_desc() { return _related_exec_node->row_desc(); } +protected: + void _fresh_exec_timer(NodeType* node) { + node->profile()->total_time_counter()->update( + _runtime_profile->total_time_counter()->value()); + } - ExecNode* exec_node() const { return _related_exec_node; } + NodeType* _sink; +}; - int32_t id() const { return _id; } +template <typename OperatorBuilderType> +class Operator : public OperatorBase { +public: + using NodeType = + std::remove_pointer_t<decltype(std::declval<OperatorBuilderType>().exec_node())>; + + Operator(OperatorBuilderBase* builder, ExecNode* node) + : OperatorBase(builder), _node(reinterpret_cast<NodeType*>(node)) {}; + + virtual ~Operator() = default; + + virtual Status prepare(RuntimeState* state) override { Review Comment: warning: 'virtual' is redundant since the function is already declared 'override' [modernize-use-override] ```suggestion fault; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org