Gabriel39 commented on code in PR #47524:
URL: https://github.com/apache/doris/pull/47524#discussion_r1962921259


##########
be/src/runtime/runtime_state.cpp:
##########
@@ -490,26 +490,24 @@ RuntimeFilterMgr* 
RuntimeState::global_runtime_filter_mgr() {
 }
 
 Status RuntimeState::register_producer_runtime_filter(
-        const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* 
producer_filter) {
+        const TRuntimeFilterDesc& desc, 
std::shared_ptr<RuntimeFilterProducer>* producer_filter,
+        RuntimeProfile* parent_profile) {
     // Producers are created by local runtime filter mgr and shared by global 
runtime filter manager.
     // When RF is published, consumers in both global and local RF mgr will be 
found.
-    RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, 
query_options(),
-                                                                         
producer_filter));
-    
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
-            desc, query_options(), *producer_filter));
+    RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
+            desc, query_options(), producer_filter, parent_profile));
+    
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_filter(desc, 
query_options(),

Review Comment:
   Pass `parent_profile` to collect profile for `local_merger_filter`



##########
be/src/runtime_filter/runtime_filter_slots.cpp:
##########
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_slots.h"
+
+#include "pipeline/pipeline_task.h"
+#include "runtime_filter/role/producer.h"
+#include "runtime_filter/runtime_filter_wrapper.h"
+#include "util/defer_op.h"
+
+namespace doris {
+
+Status RuntimeFilterSlots::send_filter_size(
+        RuntimeState* state, uint64_t hash_table_size,
+        std::shared_ptr<pipeline::CountedFinishDependency> dependency) {
+    if (_skip_runtime_filters_process) {
+        return Status::OK();
+    }
+
+    dependency->add(); // add count at start to avoid dependency ready 
multiple times

Review Comment:
   Remove this 



##########
be/src/runtime_filter/role/producer.cpp:
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/role/producer.h"
+
+#include <glog/logging.h>
+
+#include "runtime_filter/role/consumer.h"
+#include "runtime_filter/role/merger.h"
+#include "runtime_filter/runtime_filter_wrapper.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+
+namespace doris {
+
+Status RuntimeFilterProducer::_send_to_remote_targets(RuntimeState* state,
+                                                      RuntimeFilter* 
merger_filter) {
+    TNetworkAddress addr;
+    
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr));
+    return merger_filter->_push_to_remote(state, &addr);
+};
+
+Status RuntimeFilterProducer::_send_to_local_targets(RuntimeFilter* 
merger_filter, bool global) {
+    std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters =
+            global ? 
_state->global_runtime_filter_mgr()->get_consume_filters(_wrapper->filter_id())
+                   : 
_state->local_runtime_filter_mgr()->get_consume_filters(_wrapper->filter_id());
+    for (auto filter : filters) {
+        filter->signal(merger_filter);
+    }
+    return Status::OK();
+};
+
+Status RuntimeFilterProducer::publish(RuntimeState* state, bool publish_local) 
{
+    _check_state({State::READY_TO_PUBLISH});
+
+    auto do_merge = [&]() {
+        // two case we need do local merge:
+        // 1. has remote target
+        // 2. has local target and has global consumer (means target scan has 
local shuffle)
+        if (_has_local_target && _state->global_runtime_filter_mgr()
+                                         
->get_consume_filters(_wrapper->filter_id())
+                                         .empty()) {
+            // when global consumer not exist, send_to_local_targets will do 
nothing, so merge rf is useless
+            return Status::OK();
+        }
+        LocalMergeContext* local_merge_filters = nullptr;
+        
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
+                _wrapper->filter_id(), &local_merge_filters));
+        std::lock_guard l(local_merge_filters->mtx);
+        RETURN_IF_ERROR(local_merge_filters->merger->merge_from(this));
+        if (local_merge_filters->merger->ready()) {
+            if (_has_local_target) {
+                
RETURN_IF_ERROR(_send_to_local_targets(local_merge_filters->merger.get(), 
true));
+            } else {
+                RETURN_IF_ERROR(_send_to_remote_targets(state, 
local_merge_filters->merger.get()));
+            }
+        }
+        return Status::OK();
+    };
+
+    if (_has_local_target) {
+        // A runtime filter may have multiple targets and some of those are 
local-merge RF and others are not.
+        // So for all runtime filters' producers, `publish` should notify all 
consumers in global RF mgr which manages local-merge RF and local RF mgr which 
manages others.
+        RETURN_IF_ERROR(do_merge());
+        RETURN_IF_ERROR(_send_to_local_targets(this, false));
+    } else if (!publish_local) {
+        if (_is_broadcast_join) {
+            RETURN_IF_ERROR(_send_to_remote_targets(state, this));
+        } else {
+            RETURN_IF_ERROR(do_merge());
+        }
+    } else {
+        // remote broadcast join only push onetime in build shared hash table
+        // publish_local only set true on copy shared hash table
+        DCHECK(_is_broadcast_join);
+    }
+
+    _set_state(State::PUBLISHED);
+    return Status::OK();
+}
+
+class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
+                                                  
DummyBrpcCallback<PSendFilterSizeResponse>> {
+    std::shared_ptr<pipeline::Dependency> _dependency;
+    // Should use weak ptr here, because when query context deconstructs, 
should also delete runtime filter
+    // context, it not the memory is not released. And rpc is in another 
thread, it will hold rf context
+    // after query context because the rpc is not returned.
+    std::weak_ptr<RuntimeFilterWrapper> _wrapper;
+    using Base =
+            AutoReleaseClosure<PSendFilterSizeRequest, 
DummyBrpcCallback<PSendFilterSizeResponse>>;
+    ENABLE_FACTORY_CREATOR(SyncSizeClosure);
+
+    void _process_if_rpc_failed() override {
+        Defer defer {[&]() { 
((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
+        auto wrapper = _wrapper.lock();
+        if (!wrapper) {
+            return;
+        }
+
+        wrapper->disable(cntl_->ErrorText());
+        Base::_process_if_rpc_failed();
+    }
+
+    void _process_if_meet_error_status(const Status& status) override {
+        Defer defer {[&]() { 
((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
+        auto wrapper = _wrapper.lock();
+        if (!wrapper) {
+            return;
+        }
+
+        wrapper->disable(status.to_string());
+    }
+
+public:
+    SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
+                    
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
+                    std::shared_ptr<pipeline::Dependency> dependency,
+                    std::shared_ptr<RuntimeFilterWrapper> wrapper,
+                    std::weak_ptr<QueryContext> context)
+            : Base(req, callback, context), 
_dependency(std::move(dependency)), _wrapper(wrapper) {}
+};
+
+Status RuntimeFilterProducer::send_filter_size(
+        RuntimeState* state, uint64_t local_filter_size,
+        const std::shared_ptr<pipeline::CountedFinishDependency>& dependency) {
+    if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
+        _check_state({State::WAITING_FOR_DATA});
+        return Status::OK();
+    }
+    _dependency = dependency;
+    _dependency->add();
+    _set_state(State::WAITING_FOR_SYNCED_SIZE);
+
+    // two case we need do local merge:
+    // 1. has remote target
+    // 2. has local target and has global consumer (means target scan has 
local shuffle)
+    if (_has_remote_target ||
+        
!_state->global_runtime_filter_mgr()->get_consume_filters(_wrapper->filter_id()).empty())
 {
+        LocalMergeContext* local_merge_filters = nullptr;
+        
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
+                _wrapper->filter_id(), &local_merge_filters));
+        std::lock_guard l(local_merge_filters->mtx);
+        if (local_merge_filters->merger->add_rf_size(local_filter_size)) {
+            if (_has_local_target) {
+                for (auto filter : local_merge_filters->producers) {
+                    
filter->set_synced_size(local_merge_filters->merger->get_received_sum_size());
+                }
+                return Status::OK();
+            } else {
+                local_filter_size = 
local_merge_filters->merger->get_received_sum_size();

Review Comment:
   `get_received_sum_size` will always return 0.



##########
be/src/runtime_filter/role/merger.h:
##########
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime_filter/role/runtime_filter.h"
+#include "runtime_filter/runtime_filter_definitions.h"
+
+namespace doris {
+
+class RuntimeFilterMerger : public RuntimeFilter {
+public:
+    static Status create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
+                         std::shared_ptr<RuntimeFilterMerger>* res) {
+        *res = std::shared_ptr<RuntimeFilterMerger>(new 
RuntimeFilterMerger(state, desc));
+        vectorized::VExprContextSPtr build_ctx;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(desc->src_expr, 
build_ctx));
+        (*res)->_wrapper = std::make_shared<RuntimeFilterWrapper>(
+                build_ctx->root()->type().type, (*res)->_runtime_filter_type, 
desc->filter_id,
+                RuntimeFilterWrapper::State::IGNORED);
+        return Status::OK();
+    }
+
+    std::string debug_string() const override {
+        return fmt::format(
+                "Merger: ({}, expected_producer_num: {}, 
received_producer_num: {}, "
+                "received_rf_size_num: {}, received_sum_size: {})",
+                _debug_string(), _expected_producer_num, 
_received_producer_num,
+                _received_rf_size_num, _received_sum_size);
+    }
+
+    Status merge_from(const RuntimeFilter* other) {
+        _received_producer_num++;
+        DCHECK_GE(_expected_producer_num, _received_producer_num) << 
debug_string();
+        if (_received_producer_num == _expected_producer_num) {
+            _rf_state = State::READY;
+        }
+        if (_wrapper->get_state() == RuntimeFilterWrapper::State::IGNORED) {
+            _wrapper = other->_wrapper;
+            return Status::OK();
+        }
+        return _wrapper->merge(other->_wrapper.get());
+    }
+
+    void set_expected_producer_num(int num) { _expected_producer_num = num; }
+
+    bool add_rf_size(uint64_t size) {
+        _received_rf_size_num++;
+        DCHECK_GE(_expected_producer_num, _received_rf_size_num) << 
debug_string();
+        return (_received_rf_size_num == _expected_producer_num);

Review Comment:
   `_received_rf_size_num` could be a int instead of atomic int



##########
be/src/runtime_filter/role/merger.h:
##########
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime_filter/role/runtime_filter.h"
+#include "runtime_filter/runtime_filter_definitions.h"
+
+namespace doris {
+
+class RuntimeFilterMerger : public RuntimeFilter {
+public:
+    static Status create(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
+                         std::shared_ptr<RuntimeFilterMerger>* res) {
+        *res = std::shared_ptr<RuntimeFilterMerger>(new 
RuntimeFilterMerger(state, desc));
+        vectorized::VExprContextSPtr build_ctx;
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(desc->src_expr, 
build_ctx));
+        (*res)->_wrapper = std::make_shared<RuntimeFilterWrapper>(
+                build_ctx->root()->type().type, (*res)->_runtime_filter_type, 
desc->filter_id,
+                RuntimeFilterWrapper::State::IGNORED);
+        return Status::OK();
+    }
+
+    std::string debug_string() const override {
+        return fmt::format(
+                "Merger: ({}, expected_producer_num: {}, 
received_producer_num: {}, "
+                "received_rf_size_num: {}, received_sum_size: {})",
+                _debug_string(), _expected_producer_num, 
_received_producer_num,
+                _received_rf_size_num, _received_sum_size);
+    }
+
+    Status merge_from(const RuntimeFilter* other) {
+        _received_producer_num++;
+        DCHECK_GE(_expected_producer_num, _received_producer_num) << 
debug_string();
+        if (_received_producer_num == _expected_producer_num) {
+            _rf_state = State::READY;
+        }
+        if (_wrapper->get_state() == RuntimeFilterWrapper::State::IGNORED) {
+            _wrapper = other->_wrapper;
+            return Status::OK();
+        }
+        return _wrapper->merge(other->_wrapper.get());
+    }
+
+    void set_expected_producer_num(int num) { _expected_producer_num = num; }
+
+    bool add_rf_size(uint64_t size) {

Review Comment:
   This parameter `size` is not used.



##########
be/src/runtime_filter/role/producer.cpp:
##########
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/role/producer.h"
+
+#include <glog/logging.h>
+
+#include "runtime_filter/role/consumer.h"
+#include "runtime_filter/role/merger.h"
+#include "runtime_filter/runtime_filter_wrapper.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+
+namespace doris {
+
+Status RuntimeFilterProducer::_send_to_remote_targets(RuntimeState* state,
+                                                      RuntimeFilter* 
merger_filter) {
+    TNetworkAddress addr;
+    
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr));
+    return merger_filter->_push_to_remote(state, &addr);
+};
+
+Status RuntimeFilterProducer::_send_to_local_targets(RuntimeFilter* 
merger_filter, bool global) {
+    std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters =
+            global ? 
_state->global_runtime_filter_mgr()->get_consume_filters(_wrapper->filter_id())
+                   : 
_state->local_runtime_filter_mgr()->get_consume_filters(_wrapper->filter_id());
+    for (auto filter : filters) {
+        filter->signal(merger_filter);
+    }
+    return Status::OK();
+};
+
+Status RuntimeFilterProducer::publish(RuntimeState* state, bool publish_local) 
{
+    _check_state({State::READY_TO_PUBLISH});
+
+    auto do_merge = [&]() {
+        // two case we need do local merge:
+        // 1. has remote target
+        // 2. has local target and has global consumer (means target scan has 
local shuffle)
+        if (_has_local_target && _state->global_runtime_filter_mgr()
+                                         
->get_consume_filters(_wrapper->filter_id())
+                                         .empty()) {
+            // when global consumer not exist, send_to_local_targets will do 
nothing, so merge rf is useless
+            return Status::OK();
+        }
+        LocalMergeContext* local_merge_filters = nullptr;
+        
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
+                _wrapper->filter_id(), &local_merge_filters));
+        std::lock_guard l(local_merge_filters->mtx);
+        RETURN_IF_ERROR(local_merge_filters->merger->merge_from(this));
+        if (local_merge_filters->merger->ready()) {
+            if (_has_local_target) {
+                
RETURN_IF_ERROR(_send_to_local_targets(local_merge_filters->merger.get(), 
true));
+            } else {
+                RETURN_IF_ERROR(_send_to_remote_targets(state, 
local_merge_filters->merger.get()));
+            }
+        }
+        return Status::OK();
+    };
+
+    if (_has_local_target) {
+        // A runtime filter may have multiple targets and some of those are 
local-merge RF and others are not.
+        // So for all runtime filters' producers, `publish` should notify all 
consumers in global RF mgr which manages local-merge RF and local RF mgr which 
manages others.
+        RETURN_IF_ERROR(do_merge());
+        RETURN_IF_ERROR(_send_to_local_targets(this, false));
+    } else if (!publish_local) {
+        if (_is_broadcast_join) {
+            RETURN_IF_ERROR(_send_to_remote_targets(state, this));
+        } else {
+            RETURN_IF_ERROR(do_merge());
+        }
+    } else {
+        // remote broadcast join only push onetime in build shared hash table
+        // publish_local only set true on copy shared hash table
+        DCHECK(_is_broadcast_join);
+    }
+
+    _set_state(State::PUBLISHED);
+    return Status::OK();
+}
+
+class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
+                                                  
DummyBrpcCallback<PSendFilterSizeResponse>> {
+    std::shared_ptr<pipeline::Dependency> _dependency;
+    // Should use weak ptr here, because when query context deconstructs, 
should also delete runtime filter
+    // context, it not the memory is not released. And rpc is in another 
thread, it will hold rf context
+    // after query context because the rpc is not returned.
+    std::weak_ptr<RuntimeFilterWrapper> _wrapper;
+    using Base =
+            AutoReleaseClosure<PSendFilterSizeRequest, 
DummyBrpcCallback<PSendFilterSizeResponse>>;
+    ENABLE_FACTORY_CREATOR(SyncSizeClosure);
+
+    void _process_if_rpc_failed() override {
+        Defer defer {[&]() { 
((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
+        auto wrapper = _wrapper.lock();
+        if (!wrapper) {
+            return;
+        }
+
+        wrapper->disable(cntl_->ErrorText());
+        Base::_process_if_rpc_failed();
+    }
+
+    void _process_if_meet_error_status(const Status& status) override {
+        Defer defer {[&]() { 
((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
+        auto wrapper = _wrapper.lock();
+        if (!wrapper) {
+            return;
+        }
+
+        wrapper->disable(status.to_string());
+    }
+
+public:
+    SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
+                    
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
+                    std::shared_ptr<pipeline::Dependency> dependency,
+                    std::shared_ptr<RuntimeFilterWrapper> wrapper,
+                    std::weak_ptr<QueryContext> context)
+            : Base(req, callback, context), 
_dependency(std::move(dependency)), _wrapper(wrapper) {}
+};
+
+Status RuntimeFilterProducer::send_filter_size(
+        RuntimeState* state, uint64_t local_filter_size,
+        const std::shared_ptr<pipeline::CountedFinishDependency>& dependency) {
+    if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
+        _check_state({State::WAITING_FOR_DATA});
+        return Status::OK();
+    }
+    _dependency = dependency;
+    _dependency->add();
+    _set_state(State::WAITING_FOR_SYNCED_SIZE);
+
+    // two case we need do local merge:
+    // 1. has remote target
+    // 2. has local target and has global consumer (means target scan has 
local shuffle)
+    if (_has_remote_target ||
+        
!_state->global_runtime_filter_mgr()->get_consume_filters(_wrapper->filter_id()).empty())
 {
+        LocalMergeContext* local_merge_filters = nullptr;
+        
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
+                _wrapper->filter_id(), &local_merge_filters));
+        std::lock_guard l(local_merge_filters->mtx);
+        if (local_merge_filters->merger->add_rf_size(local_filter_size)) {
+            if (_has_local_target) {
+                for (auto filter : local_merge_filters->producers) {
+                    
filter->set_synced_size(local_merge_filters->merger->get_received_sum_size());
+                }
+                return Status::OK();
+            } else {
+                local_filter_size = 
local_merge_filters->merger->get_received_sum_size();
+            }
+        } else {
+            return Status::OK();
+        }
+
+    } else if (_has_local_target) {

Review Comment:
   `_has_local_target` should be always true here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to