This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch rf-thrift-poc
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/rf-thrift-poc by this push:
new dcc240db083 move rf info from instance level to be level (#49474)
dcc240db083 is described below
commit dcc240db083b246ae541afeeb9fd71b33c72b1d2
Author: minghong <[email protected]>
AuthorDate: Tue Mar 25 18:25:55 2025 +0800
move rf info from instance level to be level (#49474)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../rules/analysis/EliminateLogicalSelectHint.java | 3 +-
.../nereids/trees/plans/algebra/SetOperation.java | 4 +++
.../doris/qe/runtime/ThriftPlansBuilder.java | 39 ++++++++++++----------
gensrc/thrift/PaloInternalService.thrift | 13 ++++++--
4 files changed, 38 insertions(+), 21 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java
index f7327100006..43ef5755559 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/EliminateLogicalSelectHint.java
@@ -66,7 +66,8 @@ public class EliminateLogicalSelectHint extends
OneRewriteRuleFactory {
} else if (hintName.equalsIgnoreCase("LEADING")) {
extractLeading((SelectHintLeading) hint,
ctx.cascadesContext,
ctx.statementContext, selectHintPlan);
- } else if (hintName.equalsIgnoreCase("USE_CBO_RULE")) {
+ } else if (hintName.equalsIgnoreCase("USE_CBO_RULE")
+ || hintName.equalsIgnoreCase("NO_USE_CBO_RULE")) {
extractRule((SelectHintUseCboRule) hint,
ctx.statementContext);
} else if (hintName.equalsIgnoreCase("USE_MV")) {
extractMv((SelectHintUseMv) hint,
ConnectContext.get().getStatementContext());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java
index dedd877eead..0407348721f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/SetOperation.java
@@ -43,4 +43,8 @@ public interface SetOperation {
List<List<SlotReference>> getRegularChildrenOutputs();
int getArity();
+
+ default boolean isDistinct() {
+ return getQualifier() == Qualifier.DISTINCT;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 9b114d85ea4..475891ec0f2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -52,6 +52,7 @@ import org.apache.doris.thrift.TPipelineInstanceParams;
import org.apache.doris.thrift.TPlanFragment;
import org.apache.doris.thrift.TPlanFragmentDestination;
import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TRuntimeFilterInfo;
import org.apache.doris.thrift.TRuntimeFilterParams;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TTopnFilterDesc;
@@ -112,9 +113,7 @@ public class ThriftPlansBuilder {
workerProcessInstanceNum, coordinatorContext);
TPipelineInstanceParams instanceParam = instanceToThrift(
- currentFragmentParam, instanceJob,
runtimeFiltersThriftBuilder,
- topNFilterThriftSupplier, currentInstanceIndex++
- );
+ currentFragmentParam, instanceJob,
currentInstanceIndex++);
currentFragmentParam.getLocalParams().add(instanceParam);
}
@@ -122,7 +121,8 @@ public class ThriftPlansBuilder {
// so we can merge and send multiple fragment to a backend use one
rpc
for (Entry<DistributedPlanWorker, TPipelineFragmentParams> kv :
workerToCurrentFragment.entrySet()) {
TPipelineFragmentParamsList fragments =
fragmentsGroupByWorker.computeIfAbsent(
- kv.getKey(), w -> new TPipelineFragmentParamsList());
+ kv.getKey(), w ->
beToThrift(runtimeFiltersThriftBuilder,
+ topNFilterThriftSupplier));
fragments.addToParamsList(kv.getValue());
}
}
@@ -293,6 +293,22 @@ public class ThriftPlansBuilder {
return destination;
}
+ private static TPipelineFragmentParamsList beToThrift(
+ RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
+ Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier) {
+ TPipelineFragmentParamsList beParam = new
TPipelineFragmentParamsList();
+ TRuntimeFilterInfo runtimeFilterInfo = new TRuntimeFilterInfo();
+ runtimeFilterInfo.setTopnFilterDescs(topNFilterThriftSupplier.get());
+
+ // set for runtime filter
+ TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
+
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
+ runtimeFilterInfo.setRuntimeFilterParams(runtimeFilterParams);
+
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
+ beParam.setRuntimeFilterInfo(runtimeFilterInfo);
+ return beParam;
+ }
+
private static TPipelineFragmentParams fragmentToThriftIfAbsent(
PipelineDistributedPlan fragmentPlan, AssignedJob assignedJob,
Map<DistributedPlanWorker, TPipelineFragmentParams>
workerToFragmentParams,
@@ -401,26 +417,13 @@ public class ThriftPlansBuilder {
}
private static TPipelineInstanceParams instanceToThrift(
- TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
- RuntimeFiltersThriftBuilder runtimeFiltersThriftBuilder,
- Supplier<List<TTopnFilterDesc>> topNFilterThriftSupplier, int
currentInstanceNum) {
+ TPipelineFragmentParams currentFragmentParam, AssignedJob
instance, int currentInstanceNum) {
TPipelineInstanceParams instanceParam = new TPipelineInstanceParams();
instanceParam.setFragmentInstanceId(instance.instanceId());
setScanSourceParam(currentFragmentParam, instance, instanceParam);
instanceParam.setSenderId(instance.indexInUnassignedJob());
instanceParam.setBackendNum(currentInstanceNum);
- instanceParam.setRuntimeFilterParams(new TRuntimeFilterParams());
-
- instanceParam.setTopnFilterDescs(topNFilterThriftSupplier.get());
-
- // set for runtime filter
- TRuntimeFilterParams runtimeFilterParams = new TRuntimeFilterParams();
-
runtimeFilterParams.setRuntimeFilterMergeAddr(runtimeFiltersThriftBuilder.mergeAddress);
- instanceParam.setRuntimeFilterParams(runtimeFilterParams);
- if
(runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) {
-
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
- }
boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
if (isLocalShuffle) {
// a fragment in a backend only enable local shuffle once for the
first local shuffle instance,
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index d2805ba83cb..17b531d1248 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -470,9 +470,9 @@ struct TPlanFragmentExecParams {
10: optional i32 num_senders
11: optional bool send_query_statistics_with_every_batch
// Used to merge and send runtime filter
- 12: optional TRuntimeFilterParams runtime_filter_params
+ 12: optional TRuntimeFilterParams runtime_filter_params //deprecated
13: optional bool group_commit // deprecated
- 14: optional list<i32> topn_filter_source_node_ids
+ 14: optional list<i32> topn_filter_source_node_ids //deprecated
}
// Global query parameters assigned by the coordinator.
@@ -803,6 +803,14 @@ struct TPipelineFragmentParams {
1000: optional bool is_mow_table;
}
+// pull up runtime filter info from instance level to BE level
+struct TRuntimeFilterInfo {
+ // for join runtime filter and setop runtime filter
+ 1: optional TRuntimeFilterParams runtime_filter_params
+ // for topn runtime filter
+ 2: optional list<PlanNodes.TTopnFilterDesc> topn_filter_descs
+}
+
struct TPipelineFragmentParamsList {
1: optional list<TPipelineFragmentParams> params_list;
2: optional Descriptors.TDescriptorTable desc_tbl;
@@ -819,4 +827,5 @@ struct TPipelineFragmentParamsList {
11: optional Types.TUniqueId query_id
12: optional list<i32> topn_filter_source_node_ids
13: optional Types.TNetworkAddress runtime_filter_merge_addr
+ 14: optional TRuntimeFilterInfo runtime_filter_info
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]