Gabriel39 commented on code in PR #23750: URL: https://github.com/apache/doris/pull/23750#discussion_r1312690184
########## be/src/pipeline/exec/repeat_operator.h: ########## @@ -45,5 +45,56 @@ class RepeatOperator final : public StatefulOperator<RepeatOperatorBuilder> { Status close(RuntimeState* state) override; }; +class RepeatLocalState final : public PipelineXLocalState<FakeDependency> { +public: + ENABLE_FACTORY_CREATOR(RepeatLocalState); + using Base = PipelineXLocalState<FakeDependency>; + RepeatLocalState(RuntimeState* state, OperatorXBase* parent); + + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status close(RuntimeState* state) override; + +private: + friend class RepeatOperatorX; + std::unique_ptr<vectorized::Block> _child_block; + SourceState _child_source_state; + bool _child_eos; + int _repeat_id_idx; + std::unique_ptr<vectorized::Block> _intermediate_block {}; +}; +class RepeatOperatorX final : public OperatorXBase { +public: + RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override; + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + Status init(const TPlanNode& tnode, RuntimeState* state) override; + + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + +private: + friend class RepeatLocalState; + Status get_repeated_block(vectorized::Block* child_block, int repeat_id_idx, + vectorized::Block* output_block); + bool need_more_input_data(RuntimeState* state) const; + + Status pull(RuntimeState* state, vectorized::Block* output_block, SourceState& source_state); + Status push(RuntimeState* state, vectorized::Block* input_block, SourceState& source_state); + // Slot id set used to indicate those slots need to set to null. + std::vector<std::set<SlotId>> _slot_id_set_list; + // all slot id + std::set<SlotId> _all_slot_ids; + // An integer bitmap list, it indicates the bit position of the exprs not null. + std::vector<int64_t> _repeat_id_list; + std::vector<std::vector<int64_t>> _grouping_list; + TupleId _output_tuple_id; + const TupleDescriptor* _output_tuple_desc; + + std::vector<SlotDescriptor*> _output_slots; + + vectorized::VExprContextSPtrs _expr_ctxs; Review Comment: Also need this in local state ########## be/src/pipeline/exec/repeat_operator.h: ########## @@ -45,5 +45,56 @@ class RepeatOperator final : public StatefulOperator<RepeatOperatorBuilder> { Status close(RuntimeState* state) override; }; +class RepeatLocalState final : public PipelineXLocalState<FakeDependency> { +public: + ENABLE_FACTORY_CREATOR(RepeatLocalState); + using Base = PipelineXLocalState<FakeDependency>; + RepeatLocalState(RuntimeState* state, OperatorXBase* parent); + + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status close(RuntimeState* state) override; + +private: + friend class RepeatOperatorX; + std::unique_ptr<vectorized::Block> _child_block; + SourceState _child_source_state; + bool _child_eos; + int _repeat_id_idx; + std::unique_ptr<vectorized::Block> _intermediate_block {}; +}; +class RepeatOperatorX final : public OperatorXBase { Review Comment: ```suggestion class RepeatOperatorX final : public OperatorX<RepeatLocalState> { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org