This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f1ad42e8318 [fix](be) Fix TopN runtime filter activation (#63969)
f1ad42e8318 is described below
commit f1ad42e83189d02074861830377a813f79fad31a
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 2 10:15:34 2026 +0800
[fix](be) Fix TopN runtime filter activation (#63969)
#59088 changed TopN runtime predicate target
initialization to rely on a storage column id. For targets that cannot
create a storage column predicate, such as non-pushdown TopN predicates
or unsupported storage columns, init_target returned before marking the
target as detected. That left RuntimePredicate disabled, so the scan
side ignored the TopN source even though FE had sent the source id. This
PR keeps the target detected when no storage predicate is created,
removes obsolete compatibility skips for missing runtime predicate
descs, and adds FE/BE coverage for the source marking and no-column
target paths.
---
be/src/exec/operator/scan_operator.cpp | 5 --
be/src/exec/operator/scan_operator.h | 5 --
be/src/runtime/runtime_predicate.cpp | 1 +
be/test/runtime/runtime_predicate_test.cpp | 86 ++++++++++++++++++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 1 -
.../doris/qe/runtime/ThriftPlansBuilder.java | 9 +--
.../java/org/apache/doris/qe/CoordinatorTest.java | 30 ++++++++
.../doris/qe/runtime/ThriftPlansBuilderTest.java | 39 ++++++++++
8 files changed, 159 insertions(+), 17 deletions(-)
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 3821b89aca5..921c3e9291e 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1250,11 +1250,6 @@ Status
ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
_slot_id_to_slot_desc[slot->id()] = slot;
}
for (auto id : _topn_filter_source_node_ids) {
- if (!state->get_query_ctx()->has_runtime_predicate(id)) {
- // compatible with older versions fe
- continue;
- }
-
int cid = -1;
if
(state->get_query_ctx()->get_runtime_predicate(id).target_is_slot(node_id())) {
auto s = _slot_id_to_slot_desc[state->get_query_ctx()
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index f2ba87d4291..e3809b2e7d7 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -258,11 +258,6 @@ class ScanLocalState : public ScanLocalStateBase {
std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool
push_down) {
std::vector<int> result;
for (int id : _parent->cast<typename
Derived::Parent>()._topn_filter_source_node_ids) {
- if (!state->get_query_ctx()->has_runtime_predicate(id)) {
- // compatible with older versions fe
- continue;
- }
-
const auto& pred =
state->get_query_ctx()->get_runtime_predicate(id);
if (!pred.enable()) {
continue;
diff --git a/be/src/runtime/runtime_predicate.cpp
b/be/src/runtime/runtime_predicate.cpp
index 893759e2694..61d518e2bee 100644
--- a/be/src/runtime/runtime_predicate.cpp
+++ b/be/src/runtime/runtime_predicate.cpp
@@ -59,6 +59,7 @@ Status RuntimePredicate::init_target(
int32_t target_node_id, phmap::flat_hash_map<int, SlotDescriptor*>
slot_id_to_slot_desc,
const int column_id) {
if (column_id < 0) {
+ _detected_target = true;
return Status::OK();
}
std::unique_lock<std::shared_mutex> wlock(_rwlock);
diff --git a/be/test/runtime/runtime_predicate_test.cpp
b/be/test/runtime/runtime_predicate_test.cpp
new file mode 100644
index 00000000000..77e65b569a4
--- /dev/null
+++ b/be/test/runtime/runtime_predicate_test.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/runtime_predicate.h"
+
+#include <gtest/gtest.h>
+
+#include "core/data_type/data_type_factory.hpp"
+#include "core/field.h"
+#include "exec/pipeline/thrift_builder.h"
+#include "runtime/descriptors.h"
+
+namespace doris {
+namespace {
+
+constexpr TPlanNodeId SOURCE_NODE_ID = 10;
+constexpr TPlanNodeId TARGET_NODE_ID = 20;
+constexpr SlotId SLOT_ID = 0;
+
+TTopnFilterDesc create_topn_filter_desc() {
+ auto target_expr = TRuntimeFilterDescBuilder::get_default_expr();
+
+ TTopnFilterDesc desc;
+ desc.__set_source_node_id(SOURCE_NODE_ID);
+ desc.__set_is_asc(true);
+ desc.__set_null_first(false);
+ desc.__set_target_node_id_to_target_expr({{TARGET_NODE_ID, target_expr}});
+ return desc;
+}
+
+SlotDescriptor create_int_slot_descriptor() {
+ SlotDescriptor slot_desc;
+ slot_desc._id = SLOT_ID;
+ slot_desc._col_name = "k1";
+ slot_desc._type =
DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, false);
+ return slot_desc;
+}
+
+} // namespace
+
+TEST(RuntimePredicateTest,
init_target_creates_column_predicate_for_valid_column_id) {
+ RuntimePredicate predicate(create_topn_filter_desc());
+ predicate.set_detected_source();
+
+ auto slot_desc = create_int_slot_descriptor();
+ phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc;
+ slot_id_to_slot_desc[SLOT_ID] = &slot_desc;
+
+ ASSERT_TRUE(predicate.init_target(TARGET_NODE_ID, slot_id_to_slot_desc,
0).ok());
+
+ EXPECT_TRUE(predicate.enable());
+ EXPECT_EQ("k1", predicate.get_col_name(TARGET_NODE_ID));
+ EXPECT_NE(nullptr, predicate.get_predicate(TARGET_NODE_ID));
+}
+
+TEST(RuntimePredicateTest,
init_target_without_column_predicate_still_enables_runtime_filter) {
+ RuntimePredicate predicate(create_topn_filter_desc());
+ predicate.set_detected_source();
+
+ phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc;
+ ASSERT_TRUE(predicate.init_target(TARGET_NODE_ID, slot_id_to_slot_desc,
-1).ok());
+
+ EXPECT_TRUE(predicate.enable());
+ EXPECT_EQ(nullptr, predicate.get_predicate(TARGET_NODE_ID));
+
+ auto top_value = Field::create_field<TYPE_INT>(10);
+ ASSERT_TRUE(predicate.update(top_value).ok());
+ EXPECT_TRUE(predicate.has_value());
+ EXPECT_EQ(top_value, predicate.get_value());
+}
+
+} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5181a77b042..b8bdc51f371 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3234,7 +3234,6 @@ public class Coordinator implements CoordInterface {
Map<TNetworkAddress, TPipelineFragmentParams> toThrift(int backendNum)
{
Set<SortNode> topnSortNodes = scanNodes.stream()
- .filter(scanNode -> scanNode instanceof OlapScanNode)
.flatMap(scanNode ->
scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet());
topnSortNodes.forEach(SortNode::setHasRuntimePredicate);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 2ccb96912d7..6c6799b51ad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -41,7 +41,6 @@ import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
-import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
@@ -190,12 +189,10 @@ public class ThriftPlansBuilder {
return workerToInstances;
}
- private static void setRuntimePredicateIfNeed(Collection<ScanNode>
scanNodes) {
+ static void setRuntimePredicateIfNeed(Collection<ScanNode> scanNodes) {
for (ScanNode scanNode : scanNodes) {
- if (scanNode instanceof OlapScanNode) {
- for (SortNode topnFilterSortNode :
scanNode.getTopnFilterSortNodes()) {
- topnFilterSortNode.setHasRuntimePredicate();
- }
+ for (SortNode topnFilterSortNode :
scanNode.getTopnFilterSortNodes()) {
+ topnFilterSortNode.setHasRuntimePredicate();
}
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index cc406858366..1cdc3163d90 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -17,15 +17,20 @@
package org.apache.doris.qe;
+import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SortNode;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineInstanceParams;
+import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TTopnFilterDesc;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.utframe.TestWithFeService;
@@ -34,9 +39,11 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -121,6 +128,29 @@ public class CoordinatorTest extends TestWithFeService {
}
}
+ @Test
+ public void testFragmentExecParamsMarksNonOlapTopnFilterSource() {
+ ScanNode scanNode = Mockito.mock(ScanNode.class);
+ SortNode sortNode = Mockito.mock(SortNode.class);
+
Mockito.when(scanNode.getTopnFilterSortNodes()).thenReturn(Collections.singletonList(sortNode));
+
+ PlanFragment fragment = Mockito.mock(PlanFragment.class);
+ Mockito.when(fragment.getFragmentId()).thenReturn(new
PlanFragmentId(0));
+ Mockito.when(fragment.toThrift()).thenReturn(new TPlanFragment());
+
Mockito.when(fragment.isTransferQueryStatisticsWithEveryBatch()).thenReturn(false);
+
+ Coordinator.FragmentExecParams fragParams = new Coordinator(0L, new
TUniqueId(1L, 1L),
+ new DescriptorTable(), Collections.singletonList(fragment),
Collections.singletonList(scanNode),
+ "UTC", false, false).new FragmentExecParams(fragment);
+ TNetworkAddress host = new TNetworkAddress("127.0.0.1", 9060);
+ fragParams.instanceExecParams.add(
+ new Coordinator.FInstanceExecParam(new TUniqueId(2L, 2L),
host, fragParams));
+
+ fragParams.toThrift(0);
+
+ Mockito.verify(sortNode).setHasRuntimePredicate();
+ }
+
private NereidsPlanner plan(String sql) throws IOException {
connectContext.getSessionVariable().setDisableNereidsRules(
"PRUNE_EMPTY_PARTITION,OLAP_SCAN_TABLET_PRUNE");
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
new file mode 100644
index 00000000000..88140c0cb2e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/qe/runtime/ThriftPlansBuilderTest.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe.runtime;
+
+import org.apache.doris.planner.ScanNode;
+import org.apache.doris.planner.SortNode;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class ThriftPlansBuilderTest {
+ @Test
+ public void testSetRuntimePredicateForNonOlapScanNode() {
+ ScanNode scanNode = Mockito.mock(ScanNode.class);
+ SortNode sortNode = Mockito.mock(SortNode.class);
+
Mockito.when(scanNode.getTopnFilterSortNodes()).thenReturn(Collections.singletonList(sortNode));
+
+
ThriftPlansBuilder.setRuntimePredicateIfNeed(Collections.singletonList(scanNode));
+
+ Mockito.verify(sortNode).setHasRuntimePredicate();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]