HappenLee commented on code in PR #44850: URL: https://github.com/apache/doris/pull/44850#discussion_r1874621494
########## be/src/pipeline/exec/exchange_sink_buffer.h: ########## @@ -226,32 +278,43 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { std::vector<std::shared_ptr<RpcInstanceStatistics>> _instance_to_rpc_stats_vec; phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> _instance_to_rpc_stats; - std::atomic<bool> _is_finishing; + // It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism. + // If an RPC error occurs, the query will be canceled. + std::atomic<bool> _is_failed; PUniqueId _query_id; PlanNodeId _dest_node_id; - // Sender instance id, unique within a fragment. StreamSender save the variable - int _sender_id; - int _be_number; std::atomic<int64_t> _rpc_count = 0; RuntimeState* _state = nullptr; QueryContext* _context = nullptr; Status _send_rpc(InstanceLoId); - // must hold the _instance_to_package_queue_mutex[id] mutex to opera - void _construct_request(InstanceLoId id, PUniqueId); + +#ifndef BE_TEST inline void _ended(InstanceLoId id); inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); - inline bool _is_receiver_eof(InstanceLoId id); inline void _turn_off_channel(InstanceLoId id, std::unique_lock<std::mutex>& with_lock); + +#else + virtual void _ended(InstanceLoId id); + virtual void _failed(InstanceLoId id, const std::string& err); + virtual void _set_receiver_eof(InstanceLoId id); + virtual void _turn_off_channel(InstanceLoId id, std::unique_lock<std::mutex>& with_lock); +#endif + void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); std::atomic<int> _total_queue_size = 0; - std::shared_ptr<Dependency> _queue_dependency = nullptr; - std::shared_ptr<Dependency> _finish_dependency = nullptr; - std::shared_ptr<Dependency> _broadcast_dependency = nullptr; - ExchangeSinkLocalState* _parent = nullptr; + + // _running_sink_count is used to track how many sinks have not finished yet. + // It is only decremented when eos is reached. + phmap::flat_hash_map<InstanceLoId, int64_t> _running_sink_count; Review Comment: rethink should we union the map. seems all map same key? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org