This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 873b128fde [feature](pipeline) add inersect/except operators (#14868)
873b128fde is described below

commit 873b128fde04e45257dc73dd192475c5c6e37109
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Fri Dec 9 14:13:48 2022 +0800

    [feature](pipeline) add inersect/except operators (#14868)
---
 be/src/exec/exec_node.cpp                        |   3 +-
 be/src/exec/exec_node.h                          |   2 +
 be/src/pipeline/CMakeLists.txt                   |   6 +-
 be/src/pipeline/exec/operator.h                  |   4 +-
 be/src/pipeline/exec/set_probe_sink_operator.cpp |  63 ++++
 be/src/pipeline/exec/set_probe_sink_operator.h   |  67 ++++
 be/src/pipeline/exec/set_sink_operator.cpp       |  43 +++
 be/src/pipeline/exec/set_sink_operator.h         |  62 ++++
 be/src/pipeline/exec/set_source_operator.cpp     |  44 +++
 be/src/pipeline/exec/set_source_operator.h       |  57 +++
 be/src/pipeline/pipeline_fragment_context.cpp    |  35 ++
 be/src/pipeline/pipeline_fragment_context.h      |   8 +
 be/src/pipeline/pipeline_task.cpp                |   1 -
 be/src/vec/CMakeLists.txt                        |   2 -
 be/src/vec/exec/vexcept_node.cpp                 | 114 ------
 be/src/vec/exec/vexcept_node.h                   |  40 --
 be/src/vec/exec/vintersect_node.cpp              | 115 ------
 be/src/vec/exec/vintersect_node.h                |  50 ---
 be/src/vec/exec/vset_operation_node.cpp          | 445 +++++++++++++++++++----
 be/src/vec/exec/vset_operation_node.h            | 243 ++-----------
 20 files changed, 793 insertions(+), 611 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index cfbc9a50ca..5849b455cb 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -72,13 +72,12 @@
 #include "vec/exec/vbroker_scan_node.h"
 #include "vec/exec/vdata_gen_scan_node.h"
 #include "vec/exec/vempty_set_node.h"
-#include "vec/exec/vexcept_node.h"
 #include "vec/exec/vexchange_node.h"
-#include "vec/exec/vintersect_node.h"
 #include "vec/exec/vmysql_scan_node.h"
 #include "vec/exec/vrepeat_node.h"
 #include "vec/exec/vschema_scan_node.h"
 #include "vec/exec/vselect_node.h"
+#include "vec/exec/vset_operation_node.h"
 #include "vec/exec/vsort_node.h"
 #include "vec/exec/vtable_function_node.h"
 #include "vec/exec/vunion_node.h"
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index eaf367f6b9..0da7338c80 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -242,6 +242,8 @@ public:
 
     ExecNode* child(int i) { return _children[i]; }
 
+    size_t children_count() const { return _children.size(); }
+
 protected:
     friend class DataSink;
 
diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt
index 38b01ddd20..659b357870 100644
--- a/be/src/pipeline/CMakeLists.txt
+++ b/be/src/pipeline/CMakeLists.txt
@@ -45,8 +45,10 @@ set(PIPELINE_FILES
         exec/sort_sink_operator.cpp
         exec/repeat_operator.cpp
         exec/table_function_operator.cpp
-        )
+        exec/set_sink_operator.cpp
+        exec/set_source_operator.cpp
+        exec/set_probe_sink_operator.cpp)
 
 add_library(Pipeline STATIC
         ${PIPELINE_FILES}
-        )
\ No newline at end of file
+        )
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index f1e29f0547..a17e8dd1f4 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -271,9 +271,7 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
         SCOPED_TIMER(_runtime_profile->total_time_counter());
-        if (!UNLIKELY(in_block)) {
-            DCHECK(source_state == SourceState::FINISHED)
-                    << "block is null, eos should invoke in finalize.";
+        if (UNLIKELY(!in_block || in_block->rows() == 0)) {
             return Status::OK();
         }
         return _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
new file mode 100644
index 0000000000..2f51edd12a
--- /dev/null
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "set_probe_sink_operator.h"
+
+#include "vec/exec/vset_operation_node.h"
+
+namespace doris::pipeline {
+
+template <bool is_intersect>
+SetProbeSinkOperatorBuilder<is_intersect>::SetProbeSinkOperatorBuilder(int32_t 
id, int child_id,
+                                                                       
ExecNode* set_node)
+        : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id, 
builder_name, set_node),
+          _child_id(child_id) {}
+
+template <bool is_intersect>
+OperatorPtr SetProbeSinkOperatorBuilder<is_intersect>::build_operator() {
+    return std::make_shared<SetProbeSinkOperator<is_intersect>>(this, 
_child_id, this->_node);
+}
+
+template <bool is_intersect>
+SetProbeSinkOperator<is_intersect>::SetProbeSinkOperator(OperatorBuilderBase* 
operator_builder,
+                                                         int child_id, 
ExecNode* set_node)
+        : 
Operator<SetProbeSinkOperatorBuilder<is_intersect>>(operator_builder, set_node),
+          _child_id(child_id) {}
+
+template <bool is_intersect>
+Status SetProbeSinkOperator<is_intersect>::sink(RuntimeState* state, 
vectorized::Block* block,
+                                                SourceState source_state) {
+    return this->_node->sink_probe(state, _child_id, block, source_state == 
SourceState::FINISHED);
+}
+
+template <bool is_intersect>
+Status SetProbeSinkOperator<is_intersect>::finalize(RuntimeState* state) {
+    return this->_node->finalize_probe(state, _child_id);
+}
+
+template <bool is_intersect>
+bool SetProbeSinkOperator<is_intersect>::can_write() {
+    DCHECK_GT(_child_id, 0);
+    return this->_node->is_child_finished(_child_id - 1);
+}
+
+template class SetProbeSinkOperatorBuilder<true>;
+template class SetProbeSinkOperatorBuilder<false>;
+template class SetProbeSinkOperator<true>;
+template class SetProbeSinkOperator<false>;
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
new file mode 100644
index 0000000000..c4dc263803
--- /dev/null
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "operator.h"
+
+namespace doris {
+
+namespace vectorized {
+template <bool is_intersect>
+class VSetOperationNode;
+}
+
+namespace pipeline {
+
+template <bool is_intersect>
+class SetProbeSinkOperatorBuilder final
+        : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> {
+private:
+    constexpr static auto builder_name =
+            is_intersect ? "IntersectProbeSinkOperatorBuilder" : 
"ExceptProbeSinkOperatorBuilder";
+
+public:
+    SetProbeSinkOperatorBuilder(int32_t id, int child_id, ExecNode* set_node);
+    bool is_sink() const override { return true; }
+
+    OperatorPtr build_operator() override;
+
+private:
+    int _child_id;
+};
+
+template <bool is_intersect>
+class SetProbeSinkOperator : public 
Operator<SetProbeSinkOperatorBuilder<is_intersect>> {
+public:
+    SetProbeSinkOperator(OperatorBuilderBase* operator_builder, int child_id, 
ExecNode* set_node);
+
+    bool can_write() override;
+
+    Status sink(RuntimeState* state, vectorized::Block* block, SourceState 
source_state) override;
+    Status finalize(RuntimeState* state) override;
+    Status open(RuntimeState* /*state*/) override { return Status::OK(); }
+    Status close(RuntimeState* /*state*/) override { return Status::OK(); }
+
+private:
+    int _child_id;
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
new file mode 100644
index 0000000000..aaa85c31f1
--- /dev/null
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "set_sink_operator.h"
+
+#include "vec/exec/vset_operation_node.h"
+
+namespace doris::pipeline {
+
+template <bool is_intersect>
+SetSinkOperatorBuilder<is_intersect>::SetSinkOperatorBuilder(int32_t id, 
ExecNode* set_node)
+        : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id, 
builder_name, set_node) {
+}
+
+template <bool is_intersect>
+OperatorPtr SetSinkOperatorBuilder<is_intersect>::build_operator() {
+    return std::make_shared<SetSinkOperator<is_intersect>>(this, this->_node);
+}
+
+template <bool is_intersect>
+SetSinkOperator<is_intersect>::SetSinkOperator(
+        OperatorBuilderBase* builder, 
vectorized::VSetOperationNode<is_intersect>* set_node)
+        : Operator<SetSinkOperatorBuilder<is_intersect>>(builder, set_node) {}
+
+template class SetSinkOperatorBuilder<true>;
+template class SetSinkOperatorBuilder<false>;
+template class SetSinkOperator<true>;
+template class SetSinkOperator<false>;
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
new file mode 100644
index 0000000000..eb6e5087fb
--- /dev/null
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "operator.h"
+
+namespace doris {
+
+namespace vectorized {
+template <bool is_intersect>
+class VSetOperationNode;
+}
+
+namespace pipeline {
+
+template <bool is_intersect>
+class SetSinkOperatorBuilder final
+        : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> {
+private:
+    constexpr static auto builder_name =
+            is_intersect ? "IntersectSinkOperatorBuilder" : 
"ExceptSinkOperatorBuilder";
+
+public:
+    SetSinkOperatorBuilder(int32_t id, ExecNode* set_node);
+    bool is_sink() const override { return true; }
+
+    OperatorPtr build_operator() override;
+};
+
+template <bool is_intersect>
+class SetSinkOperator : public Operator<SetSinkOperatorBuilder<is_intersect>> {
+public:
+    SetSinkOperator(OperatorBuilderBase* operator_builder,
+                    vectorized::VSetOperationNode<is_intersect>* set_node);
+
+    bool can_write() override { return true; }
+
+    Status close(RuntimeState* /*state*/) override { return Status::OK(); };
+
+private:
+    vectorized::VSetOperationNode<is_intersect>* _set_node;
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
new file mode 100644
index 0000000000..bb14936e83
--- /dev/null
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "set_source_operator.h"
+
+#include "vec/exec/vset_operation_node.h"
+
+namespace doris::pipeline {
+
+template <bool is_intersect>
+SetSourceOperatorBuilder<is_intersect>::SetSourceOperatorBuilder(int32_t id, 
ExecNode* set_node)
+        : OperatorBuilder<vectorized::VSetOperationNode<is_intersect>>(id, 
builder_name, set_node) {
+}
+
+template <bool is_intersect>
+OperatorPtr SetSourceOperatorBuilder<is_intersect>::build_operator() {
+    return std::make_shared<SetSourceOperator<is_intersect>>(this, 
this->_node);
+}
+
+template <bool is_intersect>
+SetSourceOperator<is_intersect>::SetSourceOperator(
+        OperatorBuilderBase* builder, 
vectorized::VSetOperationNode<is_intersect>* set_node)
+        : Operator<SetSourceOperatorBuilder<is_intersect>>(builder, set_node) 
{}
+
+template class SetSourceOperatorBuilder<true>;
+template class SetSourceOperatorBuilder<false>;
+template class SetSourceOperator<true>;
+template class SetSourceOperator<false>;
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
new file mode 100644
index 0000000000..c870d15fa8
--- /dev/null
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+#include "operator.h"
+
+namespace doris {
+
+namespace vectorized {
+template <bool is_intersect>
+class VSetOperationNode;
+}
+
+namespace pipeline {
+
+template <bool is_intersect>
+class SetSourceOperatorBuilder
+        : public OperatorBuilder<vectorized::VSetOperationNode<is_intersect>> {
+private:
+    constexpr static auto builder_name =
+            is_intersect ? "IntersectSourceOperatorBuilder" : 
"ExceptSourceOperatorBuilder";
+
+public:
+    SetSourceOperatorBuilder(int32_t id, ExecNode* set_node);
+    bool is_source() const override { return true; }
+
+    OperatorPtr build_operator() override;
+};
+
+template <bool is_intersect>
+class SetSourceOperator : public 
Operator<SetSourceOperatorBuilder<is_intersect>> {
+public:
+    SetSourceOperator(OperatorBuilderBase* builder,
+                      vectorized::VSetOperationNode<is_intersect>* set_node);
+
+    Status open(RuntimeState* /*state*/) override { return Status::OK(); };
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8c5cc7b7bb..f2deb1375a 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -33,6 +33,9 @@
 #include "exec/result_sink_operator.h"
 #include "exec/scan_node.h"
 #include "exec/scan_operator.h"
+#include "exec/set_probe_sink_operator.h"
+#include "exec/set_sink_operator.h"
+#include "exec/set_source_operator.h"
 #include "exec/sort_sink_operator.h"
 #include "exec/sort_source_operator.h"
 #include "exec/streaming_aggregation_sink_operator.h"
@@ -56,6 +59,7 @@
 #include "vec/exec/vaggregation_node.h"
 #include "vec/exec/vexchange_node.h"
 #include "vec/exec/vrepeat_node.h"
+#include "vec/exec/vset_operation_node.h"
 #include "vec/exec/vsort_node.h"
 #include "vec/sink/vresult_sink.h"
 
@@ -382,6 +386,14 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         cur_pipe->add_dependency(new_pipe);
         break;
     }
+    case TPlanNodeType::INTERSECT_NODE: {
+        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(node, 
cur_pipe));
+        break;
+    }
+    case TPlanNodeType::EXCEPT_NODE: {
+        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(node, 
cur_pipe));
+        break;
+    }
     default:
         return Status::InternalError("Unsupported exec type in pipeline: {}",
                                      print_plan_node_type(node_type));
@@ -389,6 +401,29 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
     return Status::OK();
 }
 
+template <bool is_intersect>
+Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode* node,
+                                                                        
PipelinePtr cur_pipe) {
+    auto build_pipeline = add_pipeline();
+    RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline));
+    OperatorBuilderPtr sink_builder = 
std::make_shared<SetSinkOperatorBuilder<is_intersect>>(
+            next_operator_builder_id(), node);
+    RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder));
+
+    for (int child_id = 1; child_id < node->children_count(); ++child_id) {
+        auto probe_pipeline = add_pipeline();
+        RETURN_IF_ERROR(_build_pipelines(node->child(child_id), 
probe_pipeline));
+        OperatorBuilderPtr probe_sink_builder =
+                std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(
+                        next_operator_builder_id(), child_id, node);
+        RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder));
+    }
+
+    OperatorBuilderPtr source_builder = 
std::make_shared<SetSourceOperatorBuilder<is_intersect>>(
+            next_operator_builder_id(), node);
+    return cur_pipe->add_operator(source_builder);
+}
+
 Status PipelineFragmentContext::submit() {
     if (_submitted) {
         return Status::InternalError("submitted");
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index dc88feebf3..fd1d50ebff 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -24,6 +24,11 @@ namespace doris {
 class ExecNode;
 class DataSink;
 
+namespace vectorized {
+template <bool is_intersect>
+class VSetOperationNode;
+}
+
 namespace pipeline {
 
 class PipelineTask;
@@ -121,6 +126,9 @@ private:
     Status _create_sink(const TDataSink& t_data_sink);
     Status _build_pipelines(ExecNode*, PipelinePtr);
     Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams& 
request);
+
+    template <bool is_intersect>
+    Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
 };
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index fbe1d6b819..6e867d5959 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -146,7 +146,6 @@ Status PipelineTask::execute(bool* eos) {
                 break;
             }
         }
-        *eos = false;
     }
 
     return Status::OK();
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 8dfd11ade4..ca5c9c17f5 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -94,8 +94,6 @@ set(VEC_FILES
   exec/vexchange_node.cpp
   exec/vset_operation_node.cpp
   exec/vunion_node.cpp
-  exec/vintersect_node.cpp
-  exec/vexcept_node.cpp
   exec/vselect_node.cpp
   exec/vmysql_scan_node.cpp
   exec/vschema_scan_node.cpp
diff --git a/be/src/vec/exec/vexcept_node.cpp b/be/src/vec/exec/vexcept_node.cpp
deleted file mode 100644
index 8cf391f72f..0000000000
--- a/be/src/vec/exec/vexcept_node.cpp
+++ /dev/null
@@ -1,114 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/vexcept_node.h"
-
-#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
-#include "vec/core/block.h"
-#include "vec/exec/vset_operation_node.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-namespace doris {
-namespace vectorized {
-
-VExceptNode::VExceptNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
-        : VSetOperationNode(pool, tnode, descs) {}
-
-Status VExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    RETURN_IF_ERROR(VSetOperationNode::init(tnode, state));
-    DCHECK(tnode.__isset.except_node);
-    return Status::OK();
-}
-
-Status VExceptNode::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(VSetOperationNode::prepare(state));
-    return Status::OK();
-}
-
-Status VExceptNode::open(RuntimeState* state) {
-    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExceptNode::open");
-    RETURN_IF_ERROR(VSetOperationNode::open(state));
-    bool eos = false;
-    Status st = Status::OK();
-    for (int i = 1; i < _children.size(); ++i) {
-        if (i > 1) {
-            refresh_hash_table<false>();
-        }
-
-        RETURN_IF_ERROR(child(i)->open(state));
-        eos = false;
-        int probe_expr_ctxs_sz = _child_expr_lists[i].size();
-        _probe_columns.resize(probe_expr_ctxs_sz);
-
-        while (!eos) {
-            RETURN_IF_ERROR(process_probe_block(state, i, &eos));
-            if (_probe_rows == 0) continue;
-
-            std::visit(
-                    [&](auto&& arg) {
-                        using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                            HashTableProbe<HashTableCtxType, false> 
process_hashtable_ctx(
-                                    this, state->batch_size(), _probe_rows);
-                            st = 
process_hashtable_ctx.mark_data_in_hashtable(arg);
-
-                        } else {
-                            LOG(FATAL) << "FATAL: uninited hash table";
-                        }
-                    },
-                    *_hash_table_variants);
-        }
-    }
-    return st;
-}
-
-Status VExceptNode::get_next(RuntimeState* state, Block* output_block, bool* 
eos) {
-    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VExceptNode::get_next");
-    SCOPED_TIMER(_probe_timer);
-    Status st;
-    create_mutable_cols(output_block);
-
-    std::visit(
-            [&](auto&& arg) {
-                using HashTableCtxType = std::decay_t<decltype(arg)>;
-                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                    HashTableProbe<HashTableCtxType, false> 
process_hashtable_ctx(
-                            this, state->batch_size(), _probe_rows);
-                    st = process_hashtable_ctx.get_data_in_hashtable(arg, 
_mutable_cols,
-                                                                     
output_block, eos);
-                } else {
-                    LOG(FATAL) << "FATAL: uninited hash table";
-                }
-            },
-            *_hash_table_variants);
-
-    RETURN_IF_ERROR(
-            VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, 
output_block->columns()));
-    reached_limit(output_block, eos);
-
-    return st;
-}
-
-Status VExceptNode::close(RuntimeState* state) {
-    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VExceptNode::close");
-    return VSetOperationNode::close(state);
-}
-
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/vexcept_node.h b/be/src/vec/exec/vexcept_node.h
deleted file mode 100644
index 7bbed571e7..0000000000
--- a/be/src/vec/exec/vexcept_node.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "vec/exec/vset_operation_node.h"
-
-namespace doris {
-namespace vectorized {
-
-class VExceptNode : public VSetOperationNode {
-public:
-    VExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs);
-    virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
-    virtual Status prepare(RuntimeState* state);
-    virtual Status open(RuntimeState* state);
-    using VSetOperationNode::get_next;
-    virtual Status get_next(RuntimeState* state, vectorized::Block* 
output_block, bool* eos);
-    virtual Status close(RuntimeState* state);
-
-private:
-    template <class HashTableContext, bool is_intersected>
-    friend struct HashTableProbe;
-};
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/vintersect_node.cpp 
b/be/src/vec/exec/vintersect_node.cpp
deleted file mode 100644
index b232708533..0000000000
--- a/be/src/vec/exec/vintersect_node.cpp
+++ /dev/null
@@ -1,115 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/vintersect_node.h"
-
-#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/runtime_state.h"
-#include "util/runtime_profile.h"
-#include "vec/core/block.h"
-#include "vec/exec/vset_operation_node.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-namespace doris {
-namespace vectorized {
-
-VIntersectNode::VIntersectNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
-        : VSetOperationNode(pool, tnode, descs) {}
-
-Status VIntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    RETURN_IF_ERROR(VSetOperationNode::init(tnode, state));
-    DCHECK(tnode.__isset.intersect_node);
-    return Status::OK();
-}
-
-Status VIntersectNode::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(VSetOperationNode::prepare(state));
-    return Status::OK();
-}
-
-Status VIntersectNode::open(RuntimeState* state) {
-    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VIntersectNode::open");
-    RETURN_IF_ERROR(VSetOperationNode::open(state));
-    bool eos = false;
-    Status st = Status::OK();
-
-    for (int i = 1; i < _children.size(); ++i) {
-        if (i > 1) {
-            refresh_hash_table<true>();
-        }
-
-        _valid_element_in_hash_tbl = 0;
-        RETURN_IF_ERROR(child(i)->open(state));
-        eos = false;
-        _probe_columns.resize(_child_expr_lists[i].size());
-
-        while (!eos) {
-            RETURN_IF_ERROR(process_probe_block(state, i, &eos));
-            if (_probe_rows == 0) continue;
-
-            std::visit(
-                    [&](auto&& arg) {
-                        using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                            HashTableProbe<HashTableCtxType, true> 
process_hashtable_ctx(
-                                    this, state->batch_size(), _probe_rows);
-                            st = 
process_hashtable_ctx.mark_data_in_hashtable(arg);
-
-                        } else {
-                            LOG(FATAL) << "FATAL: uninited hash table";
-                        }
-                    },
-                    *_hash_table_variants);
-        }
-    }
-    return st;
-}
-
-Status VIntersectNode::get_next(RuntimeState* state, Block* output_block, 
bool* eos) {
-    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VIntersectNode::get_next");
-    SCOPED_TIMER(_probe_timer);
-    create_mutable_cols(output_block);
-    Status st;
-
-    std::visit(
-            [&](auto&& arg) {
-                using HashTableCtxType = std::decay_t<decltype(arg)>;
-                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                    HashTableProbe<HashTableCtxType, true> 
process_hashtable_ctx(
-                            this, state->batch_size(), _probe_rows);
-                    st = process_hashtable_ctx.get_data_in_hashtable(arg, 
_mutable_cols,
-                                                                     
output_block, eos);
-
-                } else {
-                    LOG(FATAL) << "FATAL: uninited hash table";
-                }
-            },
-            *_hash_table_variants);
-
-    RETURN_IF_ERROR(
-            VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, 
output_block->columns()));
-    reached_limit(output_block, eos);
-
-    return st;
-}
-
-Status VIntersectNode::close(RuntimeState* state) {
-    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VIntersectNode::close");
-    return VSetOperationNode::close(state);
-}
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/vintersect_node.h 
b/be/src/vec/exec/vintersect_node.h
deleted file mode 100644
index 1316fcd761..0000000000
--- a/be/src/vec/exec/vintersect_node.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/exec_node.h"
-#include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_table.h"
-#include "vec/core/materialize_block.h"
-#include "vec/exec/join/join_op.h"
-#include "vec/exec/join/vacquire_list.hpp"
-#include "vec/exec/join/vhash_join_node.h"
-#include "vec/exec/vset_operation_node.h"
-#include "vec/functions/function.h"
-#include "vec/utils/util.hpp"
-
-namespace doris {
-namespace vectorized {
-
-class VExprContext;
-class VIntersectNode : public VSetOperationNode {
-public:
-    VIntersectNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
-    virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
-    virtual Status prepare(RuntimeState* state);
-    virtual Status open(RuntimeState* state);
-    using VSetOperationNode::get_next;
-    virtual Status get_next(RuntimeState* state, vectorized::Block* 
output_block, bool* eos);
-    virtual Status close(RuntimeState* state);
-
-private:
-    template <class HashTableContext, bool is_intersected>
-    friend struct HashTableProbe;
-};
-} // namespace vectorized
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index d3e9c436d6..800b9aae6c 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -23,13 +23,12 @@ namespace doris {
 namespace vectorized {
 
 //build hash table for operation node, intersect/except node
-template <class HashTableContext>
+template <class HashTableContext, bool is_intersect>
 struct HashTableBuild {
-    HashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& 
build_raw_ptrs,
-                   VSetOperationNode* operation_node, uint8_t offset)
+    HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs,
+                   VSetOperationNode<is_intersect>* operation_node, uint8_t 
offset)
             : _rows(rows),
               _offset(offset),
-              _acquired_block(acquired_block),
               _build_raw_ptrs(build_raw_ptrs),
               _operation_node(operation_node) {}
 
@@ -60,7 +59,6 @@ struct HashTableBuild {
 
             if (emplace_result.is_inserted()) { //only inserted once as the 
same key, others skip
                 new (&emplace_result.get_mapped()) Mapped({k, _offset});
-                _operation_node->_valid_element_in_hash_tbl++;
             }
         }
         return Status::OK();
@@ -69,35 +67,95 @@ struct HashTableBuild {
 private:
     const int _rows;
     const uint8_t _offset;
-    Block& _acquired_block;
     ColumnRawPtrs& _build_raw_ptrs;
-    VSetOperationNode* _operation_node;
+    VSetOperationNode<is_intersect>* _operation_node;
 };
 
-VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
-                                     const DescriptorTbl& descs)
+template <class HashTableContext, bool is_intersected>
+struct HashTableProbe {
+    HashTableProbe(VSetOperationNode<is_intersected>* operation_node, int 
probe_rows)
+            : _operation_node(operation_node),
+              _probe_rows(probe_rows),
+              _probe_raw_ptrs(operation_node->_probe_columns),
+              _arena(new Arena) {}
+
+    Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) {
+        using KeyGetter = typename HashTableContext::State;
+        using Mapped = typename HashTableContext::Mapped;
+
+        KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz, 
nullptr);
+        if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
+            if (_probe_keys.size() < _probe_rows) {
+                _probe_keys.resize(_probe_rows);
+            }
+            size_t keys_size = _probe_raw_ptrs.size();
+            for (size_t i = 0; i < _probe_rows; ++i) {
+                _probe_keys[i] =
+                        serialize_keys_to_pool_contiguous(i, keys_size, 
_probe_raw_ptrs, *_arena);
+            }
+            key_getter.set_serialized_keys(_probe_keys.data());
+        }
+
+        if constexpr (std::is_same_v<typename HashTableContext::Mapped, 
RowRefListWithFlags>) {
+            for (int probe_index = 0; probe_index < _probe_rows; 
probe_index++) {
+                auto find_result =
+                        key_getter.find_key(hash_table_ctx.hash_table, 
probe_index, *_arena);
+                if (find_result.is_found()) { //if found, marked visited
+                    auto it = find_result.get_mapped().begin();
+                    if (!(it->visited)) {
+                        it->visited = true;
+                        if constexpr (is_intersected) { //intersected
+                            _operation_node->_valid_element_in_hash_tbl++;
+                        } else {
+                            _operation_node->_valid_element_in_hash_tbl--; 
//except
+                        }
+                    }
+                }
+            }
+        } else {
+            LOG(FATAL) << "Invalid RowRefListType!";
+        }
+        return Status::OK();
+    }
+
+private:
+    VSetOperationNode<is_intersected>* _operation_node;
+    const size_t _probe_rows;
+    ColumnRawPtrs& _probe_raw_ptrs;
+    std::unique_ptr<Arena> _arena;
+    std::vector<StringRef> _probe_keys;
+};
+
+template <bool is_intersect>
+VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const 
TPlanNode& tnode,
+                                                   const DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
           _valid_element_in_hash_tbl(0),
           _mem_used(0),
-          _probe_index(-1),
-          _probe_rows(0) {
+          _build_block_index(0),
+          _build_finished(false) {
     _hash_table_variants = std::make_unique<HashTableVariants>();
     _arena = std::make_unique<Arena>();
 }
 
-Status VSetOperationNode::close(RuntimeState* state) {
-    if (is_closed()) {
-        return Status::OK();
-    }
-    START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VSetOperationNode::close");
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::release_resource(RuntimeState* state) {
     for (auto& exprs : _child_expr_lists) {
         VExpr::close(exprs, state);
     }
     release_mem();
+}
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VSetOperationNode<is_intersect>::close");
     return ExecNode::close(state);
 }
 
-Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::init(const TPlanNode& tnode, 
RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
     std::vector<std::vector<::doris::TExpr>> result_texpr_lists;
 
@@ -119,20 +177,71 @@ Status VSetOperationNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-Status VSetOperationNode::open(RuntimeState* state) {
-    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VSetOperationNode::open");
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::alloc_resource(RuntimeState* state) {
     // open result expr lists.
     for (const std::vector<VExprContext*>& exprs : _child_expr_lists) {
         RETURN_IF_ERROR(VExpr::open(exprs, state));
     }
-    RETURN_IF_ERROR(hash_table_build(state));
+    _probe_finished_children_index.assign(_child_expr_lists.size(), false);
+    _probe_columns.resize(_child_expr_lists[1].size());
     return Status::OK();
 }
 
-Status VSetOperationNode::prepare(RuntimeState* state) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VSetOperationNode<is_intersect>::open");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_ERROR(ExecNode::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
+
+    // TODO: build the hash table in a thread to open other children 
asynchronously.
+    RETURN_IF_ERROR(hash_table_build(state));
+    bool eos = false;
+    Status st = Status::OK();
+    for (int i = 1; i < _children.size(); ++i) {
+        RETURN_IF_ERROR(child(i)->open(state));
+        eos = false;
+        int probe_expr_ctxs_sz = _child_expr_lists[i].size();
+        _probe_columns.resize(probe_expr_ctxs_sz);
+
+        if constexpr (is_intersect) {
+            _valid_element_in_hash_tbl = 0;
+        } else {
+            std::visit(
+                    [&](auto&& arg) {
+                        using HashTableCtxType = std::decay_t<decltype(arg)>;
+                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                            _valid_element_in_hash_tbl = arg.hash_table.size();
+                        }
+                    },
+                    *_hash_table_variants);
+        }
+
+        while (!eos) {
+            release_block_memory(_probe_block, i);
+            RETURN_IF_CANCELLED(state);
+            RETURN_IF_ERROR_AND_CHECK_SPAN(
+                    child(i)->get_next_after_projects(state, &_probe_block, 
&eos),
+                    child(i)->get_next_span(), eos);
+
+            RETURN_IF_ERROR(sink_probe(state, i, &_probe_block, eos));
+        }
+        finalize_probe(state, i);
+    }
+    return st;
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::get_next(RuntimeState* state, Block* 
output_block,
+                                                 bool* eos) {
+    INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VExceptNode::get_next");
+    SCOPED_TIMER(_probe_timer);
+    return pull(state, output_block, eos);
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
@@ -153,7 +262,8 @@ Status VSetOperationNode::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-void VSetOperationNode::hash_table_init() {
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::hash_table_init() {
     if (_child_expr_lists[0].size() == 1 && (!_build_not_ignore_null[0])) {
         // Single column optimization
         switch (_child_expr_lists[0][0]->root()->result_type()) {
@@ -244,14 +354,56 @@ void VSetOperationNode::hash_table_init() {
     }
 }
 
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::sink(RuntimeState*, Block* block, bool 
eos) {
+    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
+
+    if (block->rows() != 0) {
+        _mem_used += block->allocated_bytes();
+        _mutable_block.merge(*block);
+    }
+
+    if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
+        _build_blocks.emplace_back(_mutable_block.to_block());
+        RETURN_IF_ERROR(process_build_block(_build_blocks[_build_block_index], 
_build_block_index));
+        _mutable_block.clear();
+        ++_build_block_index;
+
+        if (eos) {
+            _build_finished = true;
+        }
+    }
+    return Status::OK();
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::pull(RuntimeState* state, Block* 
output_block, bool* eos) {
+    create_mutable_cols(output_block);
+    auto st = std::visit(
+            [&](auto&& arg) -> Status {
+                using HashTableCtxType = std::decay_t<decltype(arg)>;
+                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                    return get_data_in_hashtable<HashTableCtxType>(arg, 
output_block,
+                                                                   
state->batch_size(), eos);
+                } else {
+                    LOG(FATAL) << "FATAL: uninited hash table";
+                }
+            },
+            *_hash_table_variants);
+    RETURN_IF_ERROR(st);
+    RETURN_IF_ERROR(
+            VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, 
output_block->columns()));
+    reached_limit(output_block, eos);
+    return Status::OK();
+}
+
 //build a hash table from child(0)
-Status VSetOperationNode::hash_table_build(RuntimeState* state) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
     RETURN_IF_ERROR(child(0)->open(state));
     Block block;
     MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
 
-    uint8_t index = 0;
-    int64_t last_mem_used = 0;
     bool eos = false;
     while (!eos) {
         block.clear_column_data();
@@ -259,34 +411,17 @@ Status VSetOperationNode::hash_table_build(RuntimeState* 
state) {
         RETURN_IF_CANCELLED(state);
         
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, 
&eos),
                                        child(0)->get_next_span(), eos);
-
-        size_t allocated_bytes = block.allocated_bytes();
-        _mem_used += allocated_bytes;
-
-        if (block.rows() != 0) {
-            mutable_block.merge(block);
-        }
-
-        // make one block for each 4 gigabytes
-        constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 
1024UL;
-        if (_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE) {
-            _build_blocks.emplace_back(mutable_block.to_block());
-            // TODO:: Rethink may we should do the proess after we recevie all 
build blocks ?
-            // which is better.
-            RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
-            mutable_block = MutableBlock();
-            ++index;
-            last_mem_used = _mem_used;
+        if (eos) {
+            child(0)->close(state);
         }
+        sink(state, &block, eos);
     }
 
-    _build_blocks.emplace_back(mutable_block.to_block());
-    child(0)->close(state);
-    RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
     return Status::OK();
 }
 
-Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::process_build_block(Block& block, 
uint8_t offset) {
     size_t rows = block.rows();
     if (rows == 0) {
         return Status::OK();
@@ -300,8 +435,8 @@ Status VSetOperationNode::process_build_block(Block& block, 
uint8_t offset) {
             [&](auto&& arg) {
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                    HashTableBuild<HashTableCtxType> 
hash_table_build_process(rows, block, raw_ptrs,
-                                                                              
this, offset);
+                    HashTableBuild<HashTableCtxType, is_intersect> 
hash_table_build_process(
+                            rows, raw_ptrs, this, offset);
                     hash_table_build_process(arg);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
@@ -312,28 +447,71 @@ Status VSetOperationNode::process_build_block(Block& 
block, uint8_t offset) {
     return Status::OK();
 }
 
-Status VSetOperationNode::process_probe_block(RuntimeState* state, int 
child_id, bool* eos) {
-    if (!_probe_column_inserted_id.empty()) {
-        for (int j = 0; j < _probe_column_inserted_id.size(); ++j) {
-            auto column_to_erase = _probe_column_inserted_id[j];
-            _probe_block.erase(column_to_erase - j);
-        }
-        _probe_column_inserted_id.clear();
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& 
value,
+                                                         int& block_size) {
+    auto it = value.begin();
+    for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); 
++idx) {
+        auto& column = 
*_build_blocks[it->block_offset].get_by_position(idx->first).column;
+        _mutable_cols[idx->second]->insert_from(column, it->row_num);
+    }
+    block_size++;
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* /*state*/, 
int child_id,
+                                                   Block* block, bool eos) {
+    CHECK(_build_finished) << "cannot sink probe data before build finished";
+    if (child_id > 1) {
+        CHECK(_probe_finished_children_index[child_id - 1])
+                << fmt::format("child with id: {} should be probed first", 
child_id);
+    }
+    auto probe_rows = block->rows();
+
+    if (probe_rows == 0) {
+        return Status::OK();
     }
-    release_block_memory(_probe_block, child_id);
-    _probe_index = 0;
-    _probe_rows = 0;
-
-    RETURN_IF_CANCELLED(state);
-    RETURN_IF_ERROR_AND_CHECK_SPAN(
-            child(child_id)->get_next_after_projects(state, &_probe_block, 
eos),
-            child(child_id)->get_next_span(), *eos);
-    _probe_rows = _probe_block.rows();
-    RETURN_IF_ERROR(extract_probe_column(_probe_block, _probe_columns, 
child_id));
+
+    RETURN_IF_ERROR(extract_probe_column(*block, _probe_columns, child_id));
+
+    return std::visit(
+            [&](auto&& arg) -> Status {
+                using HashTableCtxType = std::decay_t<decltype(arg)>;
+                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                    HashTableProbe<HashTableCtxType, is_intersect> 
process_hashtable_ctx(
+                            this, probe_rows);
+                    return process_hashtable_ctx.mark_data_in_hashtable(arg);
+                } else {
+                    LOG(FATAL) << "FATAL: uninited hash table";
+                }
+            },
+            *_hash_table_variants);
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::finalize_probe(RuntimeState* 
/*state*/, int child_id) {
+    if (child_id != (_children.size() - 1)) {
+        refresh_hash_table();
+        _probe_columns.resize(_child_expr_lists[child_id + 1].size());
+    } else {
+        _can_read = true;
+    }
+    _probe_finished_children_index[child_id] = true;
     return Status::OK();
 }
 
-Status VSetOperationNode::extract_build_column(Block& block, ColumnRawPtrs& 
raw_ptrs) {
+template <bool is_intersect>
+bool VSetOperationNode<is_intersect>::is_child_finished(int child_id) const {
+    if (child_id == 0) {
+        return _build_finished;
+    }
+
+    return _probe_finished_children_index[child_id];
+}
+
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::extract_build_column(Block& block,
+                                                             ColumnRawPtrs& 
raw_ptrs) {
     for (size_t i = 0; i < _child_expr_lists[0].size(); ++i) {
         int result_col_id = -1;
         RETURN_IF_ERROR(_child_expr_lists[0][i]->execute(&block, 
&result_col_id));
@@ -358,9 +536,10 @@ Status VSetOperationNode::extract_build_column(Block& 
block, ColumnRawPtrs& raw_
     return Status::OK();
 }
 
-Status VSetOperationNode::extract_probe_column(Block& block, ColumnRawPtrs& 
raw_ptrs,
-                                               int child_id) {
-    if (_probe_rows == 0) {
+template <bool is_intersect>
+Status VSetOperationNode<is_intersect>::extract_probe_column(Block& block, 
ColumnRawPtrs& raw_ptrs,
+                                                             int child_id) {
+    if (block.rows() == 0) {
         return Status::OK();
     }
 
@@ -395,7 +574,8 @@ Status VSetOperationNode::extract_probe_column(Block& 
block, ColumnRawPtrs& raw_
     return Status::OK();
 }
 
-void VSetOperationNode::create_mutable_cols(Block* output_block) {
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::create_mutable_cols(Block* output_block) 
{
     _mutable_cols.resize(_left_table_data_types.size());
     bool mem_reuse = output_block->mem_reuse();
 
@@ -408,7 +588,9 @@ void VSetOperationNode::create_mutable_cols(Block* 
output_block) {
     }
 }
 
-void VSetOperationNode::debug_string(int indentation_level, std::stringstream* 
out) const {
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::debug_string(int indentation_level,
+                                                   std::stringstream* out) 
const {
     *out << string(indentation_level * 2, ' ');
     *out << " _child_expr_lists=[";
     for (int i = 0; i < _child_expr_lists.size(); ++i) {
@@ -419,7 +601,8 @@ void VSetOperationNode::debug_string(int indentation_level, 
std::stringstream* o
     *out << ")" << std::endl;
 }
 
-void VSetOperationNode::release_mem() {
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::release_mem() {
     _hash_table_variants = nullptr;
     _arena = nullptr;
 
@@ -429,5 +612,113 @@ void VSetOperationNode::release_mem() {
     _probe_block.clear();
 }
 
+template <bool is_intersect>
+void VSetOperationNode<is_intersect>::refresh_hash_table() {
+    std::visit(
+            [&](auto&& arg) {
+                using HashTableCtxType = std::decay_t<decltype(arg)>;
+                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                    if constexpr (std::is_same_v<typename 
HashTableCtxType::Mapped,
+                                                 RowRefListWithFlags>) {
+                        HashTableCtxType tmp_hash_table;
+                        bool is_need_shrink =
+                                
arg.hash_table.should_be_shrink(_valid_element_in_hash_tbl);
+                        if (is_need_shrink) {
+                            tmp_hash_table.hash_table.init_buf_size(
+                                    _valid_element_in_hash_tbl / 
arg.hash_table.get_factor() + 1);
+                        }
+
+                        arg.init_once();
+                        auto& iter = arg.iter;
+                        auto iter_end = arg.hash_table.end();
+                        while (iter != iter_end) {
+                            auto& mapped = iter->get_second();
+                            auto it = mapped.begin();
+
+                            if constexpr (is_intersect) { //intersected
+                                if (it->visited) {
+                                    it->visited = false;
+                                    if (is_need_shrink) {
+                                        
tmp_hash_table.hash_table.insert(iter->get_value());
+                                    }
+                                    ++iter;
+                                } else {
+                                    if (!is_need_shrink) {
+                                        
arg.hash_table.delete_zero_key(iter->get_first());
+                                        // the ++iter would check if the 
current key is zero. if it does, the iterator will be moved to the container's 
head.
+                                        // so we do ++iter before set_zero to 
make the iterator move to next valid key correctly.
+                                        auto iter_prev = iter;
+                                        ++iter;
+                                        iter_prev->set_zero();
+                                    } else {
+                                        ++iter;
+                                    }
+                                }
+                            } else { //except
+                                if (!it->visited && is_need_shrink) {
+                                    
tmp_hash_table.hash_table.insert(iter->get_value());
+                                }
+                                ++iter;
+                            }
+                        }
+
+                        arg.inited = false;
+                        if (is_need_shrink) {
+                            arg.hash_table = 
std::move(tmp_hash_table.hash_table);
+                        }
+                    } else {
+                        LOG(FATAL) << "FATAL: Invalid RowRefList";
+                    }
+                } else {
+                    LOG(FATAL) << "FATAL: uninited hash table";
+                }
+            },
+            *_hash_table_variants);
+}
+
+template <bool is_intersected>
+template <typename HashTableContext>
+Status 
VSetOperationNode<is_intersected>::get_data_in_hashtable(HashTableContext& 
hash_table_ctx,
+                                                                Block* 
output_block,
+                                                                const int 
batch_size, bool* eos) {
+    hash_table_ctx.init_once();
+    int left_col_len = _left_table_data_types.size();
+    auto& iter = hash_table_ctx.iter;
+    auto block_size = 0;
+
+    if constexpr (std::is_same_v<typename HashTableContext::Mapped, 
RowRefListWithFlags>) {
+        for (; iter != hash_table_ctx.hash_table.end() && block_size < 
batch_size; ++iter) {
+            auto& value = iter->get_second();
+            auto it = value.begin();
+            if constexpr (is_intersected) {
+                if (it->visited) { //intersected: have done probe, so visited 
values it's the result
+                    add_result_columns(value, block_size);
+                }
+            } else {
+                if (!it->visited) { //except: haven't visited values it's the 
needed result
+                    add_result_columns(value, block_size);
+                }
+            }
+        }
+    } else {
+        LOG(FATAL) << "Invalid RowRefListType!";
+    }
+
+    *eos = iter == hash_table_ctx.hash_table.end();
+    if (!output_block->mem_reuse()) {
+        for (int i = 0; i < left_col_len; ++i) {
+            
output_block->insert(ColumnWithTypeAndName(std::move(_mutable_cols[i]),
+                                                       
_left_table_data_types[i], ""));
+        }
+    } else {
+        _mutable_cols.clear();
+    }
+
+    return Status::OK();
+}
+
+template class VSetOperationNode<true>;
+template class VSetOperationNode<false>;
+
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/exec/vset_operation_node.h 
b/be/src/vec/exec/vset_operation_node.h
index b93464ba87..1e8b5e1ebe 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -31,20 +31,35 @@ namespace doris {
 
 namespace vectorized {
 
-class VSetOperationNode : public ExecNode {
+template <bool is_intersect>
+class VSetOperationNode final : public ExecNode {
 public:
     VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
 
-    virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
-    virtual Status prepare(RuntimeState* state);
-    virtual Status open(RuntimeState* state);
-    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos) {
-        return Status::NotSupported("Not Implemented get RowBatch in vecorized 
execution.");
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
+    Status get_next(RuntimeState* /*state*/, RowBatch* /*row_batch*/, bool* 
/*eos*/) override {
+        return Status::NotSupported("Not implemented get RowBatch in 
vectorized execution.");
     }
-    virtual Status close(RuntimeState* state);
-    virtual void debug_string(int indentation_level, std::stringstream* out) 
const;
 
-protected:
+    Status get_next(RuntimeState* state, Block* output_block, bool* eos) 
override;
+
+    Status close(RuntimeState* state) override;
+    void debug_string(int indentation_level, std::stringstream* out) const 
override;
+
+    Status alloc_resource(RuntimeState* state) override;
+    void release_resource(RuntimeState* state) override;
+
+    Status sink(RuntimeState* state, Block* block, bool eos) override;
+    Status pull(RuntimeState* state, Block* output_block, bool* eos) override;
+
+    Status sink_probe(RuntimeState* state, int child_id, Block* block, bool 
eos);
+    Status finalize_probe(RuntimeState* state, int child_id);
+
+    bool is_child_finished(int child_id) const;
+
+private:
     //Todo: In build process of hashtable, It's same as join node.
     //It's time to abstract out the same methods and provide them directly to 
others;
     void hash_table_init();
@@ -52,13 +67,17 @@ protected:
     Status process_build_block(Block& block, uint8_t offset);
     Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs);
     Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int 
child_id);
-    template <bool keep_matched>
     void refresh_hash_table();
-    Status process_probe_block(RuntimeState* state, int child_id, bool* eos);
+
+    template <typename HashTableContext>
+    Status get_data_in_hashtable(HashTableContext& hash_table_ctx, Block* 
output_block,
+                                 const int batch_size, bool* eos);
+
+    void add_result_columns(RowRefListWithFlags& value, int& block_size);
+
     void create_mutable_cols(Block* output_block);
     void release_mem();
 
-protected:
     std::unique_ptr<HashTableVariants> _hash_table_variants;
 
     std::vector<size_t> _probe_key_sz;
@@ -85,207 +104,21 @@ protected:
     Block _probe_block;
     ColumnRawPtrs _probe_columns;
     std::vector<MutableColumnPtr> _mutable_cols;
-    int _probe_index;
-    size_t _probe_rows;
+    int _build_block_index;
+    bool _build_finished;
+    std::vector<bool> _probe_finished_children_index;
+    MutableBlock _mutable_block;
     RuntimeProfile::Counter* _build_timer; // time to build hash table
     RuntimeProfile::Counter* _probe_timer; // time to probe
 
-    template <class HashTableContext>
+    template <class HashTableContext, bool is_intersected>
     friend struct HashTableBuild;
     template <class HashTableContext, bool is_intersected>
     friend struct HashTableProbe;
 };
 
-template <bool keep_matched>
-void VSetOperationNode::refresh_hash_table() {
-    std::visit(
-            [&](auto&& arg) {
-                using HashTableCtxType = std::decay_t<decltype(arg)>;
-                if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                    if constexpr (std::is_same_v<typename 
HashTableCtxType::Mapped,
-                                                 RowRefListWithFlags>) {
-                        HashTableCtxType tmp_hash_table;
-                        bool is_need_shrink =
-                                
arg.hash_table.should_be_shrink(_valid_element_in_hash_tbl);
-                        if (is_need_shrink) {
-                            tmp_hash_table.hash_table.init_buf_size(
-                                    _valid_element_in_hash_tbl / 
arg.hash_table.get_factor() + 1);
-                        }
-
-                        arg.init_once();
-                        auto& iter = arg.iter;
-                        auto iter_end = arg.hash_table.end();
-                        while (iter != iter_end) {
-                            auto& mapped = iter->get_second();
-                            auto it = mapped.begin();
-
-                            if constexpr (keep_matched) { //intersected
-                                if (it->visited) {
-                                    it->visited = false;
-                                    if (is_need_shrink) {
-                                        
tmp_hash_table.hash_table.insert(iter->get_value());
-                                    }
-                                    ++iter;
-                                } else {
-                                    if (!is_need_shrink) {
-                                        
arg.hash_table.delete_zero_key(iter->get_first());
-                                        // the ++iter would check if the 
current key is zero. if it does, the iterator will be moved to the container's 
head.
-                                        // so we do ++iter before set_zero to 
make the iterator move to next valid key correctly.
-                                        auto iter_prev = iter;
-                                        ++iter;
-                                        iter_prev->set_zero();
-                                    } else {
-                                        ++iter;
-                                    }
-                                }
-                            } else { //except
-                                if (!it->visited && is_need_shrink) {
-                                    
tmp_hash_table.hash_table.insert(iter->get_value());
-                                }
-                                ++iter;
-                            }
-                        }
-
-                        arg.inited = false;
-                        if (is_need_shrink) {
-                            arg.hash_table = 
std::move(tmp_hash_table.hash_table);
-                        }
-                    } else {
-                        LOG(FATAL) << "FATAL: Invalid RowRefList";
-                    }
-                } else {
-                    LOG(FATAL) << "FATAL: uninited hash table";
-                }
-            },
-            *_hash_table_variants);
-}
-
-template <class HashTableContext, bool is_intersected>
-struct HashTableProbe {
-    HashTableProbe(VSetOperationNode* operation_node, int batch_size, int 
probe_rows)
-            : _operation_node(operation_node),
-              _left_table_data_types(operation_node->_left_table_data_types),
-              _batch_size(batch_size),
-              _probe_rows(probe_rows),
-              _build_blocks(operation_node->_build_blocks),
-              _probe_block(operation_node->_probe_block),
-              _probe_index(operation_node->_probe_index),
-              _num_rows_returned(operation_node->_num_rows_returned),
-              _probe_raw_ptrs(operation_node->_probe_columns),
-              _rows_returned_counter(operation_node->_rows_returned_counter),
-              _build_col_idx(operation_node->_build_col_idx),
-              _mutable_cols(operation_node->_mutable_cols) {}
-
-    Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) {
-        using KeyGetter = typename HashTableContext::State;
-        using Mapped = typename HashTableContext::Mapped;
-
-        KeyGetter key_getter(_probe_raw_ptrs, _operation_node->_probe_key_sz, 
nullptr);
-
-        if (_probe_index == 0) {
-            _arena.reset(new Arena());
-            if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
-                if (_probe_keys.size() < _probe_rows) {
-                    _probe_keys.resize(_probe_rows);
-                }
-                size_t keys_size = _probe_raw_ptrs.size();
-                for (size_t i = 0; i < _probe_rows; ++i) {
-                    _probe_keys[i] = serialize_keys_to_pool_contiguous(i, 
keys_size,
-                                                                       
_probe_raw_ptrs, *_arena);
-                }
-            }
-        }
-
-        if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
-            key_getter.set_serialized_keys(_probe_keys.data());
-        }
-
-        if constexpr (std::is_same_v<typename HashTableContext::Mapped, 
RowRefListWithFlags>) {
-            for (; _probe_index < _probe_rows;) {
-                auto find_result =
-                        key_getter.find_key(hash_table_ctx.hash_table, 
_probe_index, *_arena);
-                if (find_result.is_found()) { //if found, marked visited
-                    auto it = find_result.get_mapped().begin();
-                    if (!(it->visited)) {
-                        it->visited = true;
-                        if constexpr (is_intersected) //intersected
-                            _operation_node->_valid_element_in_hash_tbl++;
-                        else
-                            _operation_node->_valid_element_in_hash_tbl--; 
//except
-                    }
-                }
-                _probe_index++;
-            }
-        } else {
-            LOG(FATAL) << "Invalid RowRefListType!";
-        }
-        return Status::OK();
-    }
-
-    void add_result_columns(RowRefListWithFlags& value, int& block_size) {
-        auto it = value.begin();
-        for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); 
++idx) {
-            auto& column = 
*_build_blocks[it->block_offset].get_by_position(idx->first).column;
-            _mutable_cols[idx->second]->insert_from(column, it->row_num);
-        }
-        block_size++;
-    }
-
-    Status get_data_in_hashtable(HashTableContext& hash_table_ctx,
-                                 std::vector<MutableColumnPtr>& mutable_cols, 
Block* output_block,
-                                 bool* eos) {
-        hash_table_ctx.init_once();
-        int left_col_len = _left_table_data_types.size();
-        auto& iter = hash_table_ctx.iter;
-        auto block_size = 0;
-
-        if constexpr (std::is_same_v<typename HashTableContext::Mapped, 
RowRefListWithFlags>) {
-            for (; iter != hash_table_ctx.hash_table.end() && block_size < 
_batch_size; ++iter) {
-                auto& value = iter->get_second();
-                auto it = value.begin();
-                if constexpr (is_intersected) {
-                    if (it->visited) { //intersected: have done probe, so 
visited values it's the result
-                        add_result_columns(value, block_size);
-                    }
-                } else {
-                    if (!it->visited) { //except: haven't visited values it's 
the needed result
-                        add_result_columns(value, block_size);
-                    }
-                }
-            }
-        } else {
-            LOG(FATAL) << "Invalid RowRefListType!";
-        }
-
-        *eos = iter == hash_table_ctx.hash_table.end();
-        if (!output_block->mem_reuse()) {
-            for (int i = 0; i < left_col_len; ++i) {
-                
output_block->insert(ColumnWithTypeAndName(std::move(_mutable_cols[i]),
-                                                           
_left_table_data_types[i], ""));
-            }
-        } else {
-            _mutable_cols.clear();
-        }
-
-        return Status::OK();
-    }
-
-private:
-    VSetOperationNode* _operation_node;
-    const DataTypes& _left_table_data_types;
-    const int _batch_size;
-    const size_t _probe_rows;
-    const std::vector<Block>& _build_blocks;
-    const Block& _probe_block;
-    int& _probe_index;
-    int64_t& _num_rows_returned;
-    ColumnRawPtrs& _probe_raw_ptrs;
-    std::unique_ptr<Arena> _arena;
-    std::vector<StringRef> _probe_keys;
-    RuntimeProfile::Counter* _rows_returned_counter;
-    std::unordered_map<int, int>& _build_col_idx;
-    std::vector<MutableColumnPtr>& _mutable_cols;
-};
+using VIntersectNode = VSetOperationNode<true>;
+using VExceptNode = VSetOperationNode<false>;
 
 } // namespace vectorized
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to