This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.4.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 294e4aeb1bf86268e9ea7db23e8afaf64327104c Author: Riza Suminto <[email protected]> AuthorDate: Mon Jun 3 19:58:54 2024 -0700 IMPALA-13129: Move runtime filter skipping at registerRuntimeFilter A DCHECK in hdfs-scanner.h was hit when skipping a MIN_MAX runtime filter using RUNTIME_FILTER_IDS_TO_SKIP query option. This is because HdfsScanNode.tryToComputeOverlapPredicate() is called and register a TOverlapPredicateDesc during runtime filter generation, but the minmax filter is then skipped later, causing backend to hit DCHECK. This patch move the runtime filter skipping at registerRuntimeFilter() so that HdfsScanNode.tryToComputeOverlapPredicate() will not be called at all once a filter is skipped. Testing: - Add test in overlap_min_max_filters.test to explicitly skip a minmax runtime filter. - Pass test_runtime_filters.py Change-Id: I43c1c4abc88019aadaa85d2e3d0ecda417297bfc Reviewed-on: http://gerrit.cloudera.org:8080/21477 Reviewed-by: Wenzhe Zhou <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-scanner.h | 2 +- .../impala/planner/RuntimeFilterGenerator.java | 48 ++++++++++------------ .../queries/QueryTest/overlap_min_max_filters.test | 18 ++++++++ 3 files changed, 40 insertions(+), 28 deletions(-) diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 1d277a300..b1ab913ee 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -196,7 +196,7 @@ class HdfsScanner { return i; } } - DCHECK(false); + DCHECK(false) << "Runtime filter id " << filter_id << " not found!"; return -1; } diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java index c908e17d3..782d68581 100644 --- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java +++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java @@ -778,9 +778,9 @@ public final class RuntimeFilterGenerator { /** * Return true if this filter is deemed highly selective, will arrive on-time, and * applied for all rows without ever being disabled by the scanner node. - * Mainly used by {@link ScanNode#reduceCardinalityByRuntimeFilter(java.util.Stack)} - * to lower the scan cardinality estimation. These properties are hard to predict - * during planning, so this method is set very conservative to avoid severe + * Mainly used by {@link ScanNode#reduceCardinalityByRuntimeFilter(java.util.Stack, + * double)} to lower the scan cardinality estimation. These properties are hard to + * predict during planning, so this method is set very conservative to avoid severe * underestimation. */ public boolean isHighlySelective() { @@ -888,8 +888,7 @@ public final class RuntimeFilterGenerator { RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator( ctx.getQueryOptions()); filterGenerator.generateFilters(ctx, plan); - List<RuntimeFilter> filters = - Lists.newArrayList(filterGenerator.getRuntimeFilters(ctx)); + List<RuntimeFilter> filters = Lists.newArrayList(filterGenerator.getRuntimeFilters()); if (filters.size() > maxNumBloomFilters) { // If more than 'maxNumBloomFilters' were generated, sort them by increasing // selectivity and keep the 'maxNumBloomFilters' most selective bloom filters. @@ -1036,18 +1035,10 @@ public final class RuntimeFilterGenerator { /** * Returns a list of all the registered runtime filters, ordered by filter ID. */ - public List<RuntimeFilter> getRuntimeFilters(PlannerContext ctx) { + public List<RuntimeFilter> getRuntimeFilters() { Set<RuntimeFilter> resultSet = new HashSet<>(); for (List<RuntimeFilter> filters: runtimeFiltersByTid_.values()) { - for (RuntimeFilter filter : filters) { - if (ctx.getQueryOptions().isSetRuntime_filter_ids_to_skip() - && ctx.getQueryOptions().runtime_filter_ids_to_skip.contains( - filter.getFilterId().asInt())) { - // Skip this filter because it is explicitly excluded via query option. - continue; - } - resultSet.add(filter); - } + resultSet.addAll(filters); } List<RuntimeFilter> resultList = Lists.newArrayList(resultSet); Collections.sort(resultList, new Comparator<RuntimeFilter>() { @@ -1112,7 +1103,7 @@ public final class RuntimeFilterGenerator { ctx.getRootAnalyzer(), conjunct, joinNode, filterType, filterSizeLimits_, /* isTimestampTruncation */ false, level); if (filter != null) { - registerRuntimeFilter(filter); + registerRuntimeFilter(ctx, filter); filters.add(filter); } // For timestamp bloom filters, we also generate a RuntimeFilter with the @@ -1125,7 +1116,7 @@ public final class RuntimeFilterGenerator { ctx.getRootAnalyzer(), conjunct, joinNode, filterType, filterSizeLimits_, /* isTimestampTruncation */ true, level); if (filter2 == null) continue; - registerRuntimeFilter(filter2); + registerRuntimeFilter(ctx, filter2); filters.add(filter2); } } @@ -1149,26 +1140,29 @@ public final class RuntimeFilterGenerator { * Registers a runtime filter with the tuple id of every scan node that is a candidate * destination node for that filter. */ - private void registerRuntimeFilter(RuntimeFilter filter) { + private void registerRuntimeFilter(PlannerContext ctx, RuntimeFilter filter) { Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots(); Preconditions.checkState(targetSlotsByTid != null && !targetSlotsByTid.isEmpty()); for (TupleId tupleId: targetSlotsByTid.keySet()) { - registerRuntimeFilter(filter, tupleId); + registerRuntimeFilter(ctx, filter, tupleId); } } /** * Registers a runtime filter with a specific target tuple id. */ - private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) { - Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid)); - List<RuntimeFilter> filters = runtimeFiltersByTid_.get(targetTid); - if (filters == null) { - filters = new ArrayList<>(); - runtimeFiltersByTid_.put(targetTid, filters); + private void registerRuntimeFilter( + PlannerContext ctx, RuntimeFilter filter, TupleId targetTid) { + Preconditions.checkArgument(filter.getTargetSlots().containsKey(targetTid), + "filter does not contain target slot!"); + Preconditions.checkArgument(!filter.isFinalized(), "filter must not finalized yet!"); + if (ctx.getQueryOptions().isSetRuntime_filter_ids_to_skip() + && ctx.getQueryOptions().getRuntime_filter_ids_to_skip().contains( + filter.getFilterId().asInt())) { + // Skip this filter because it is explicitly excluded via query option. + return; } - Preconditions.checkState(!filter.isFinalized()); - filters.add(filter); + runtimeFiltersByTid_.computeIfAbsent(targetTid, t -> new ArrayList<>()).add(filter); } /** diff --git a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test index d450bb1d0..bfbf19ac4 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test +++ b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test @@ -185,6 +185,24 @@ aggregation(SUM, NumRuntimeFilteredPages): 255 ==== ---- QUERY ################################################### +# Test that explicitly skipping minmax runtime filter does not crash backend. +# Expect to see a total of 0 pages filtered out. +################################################### +SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; +SET MINMAX_FILTER_THRESHOLD=1.0; +SET MINMAX_FILTERING_LEVEL=PAGE; +SET RUNTIME_FILTER_IDS_TO_SKIP=1; +select straight_join count(*) +from lineitem_sorted_l_extendedprice a join [SHUFFLE] +lineitem_sorted_l_extendedprice b +where a.l_extendedprice = b.l_extendedprice and b.l_orderkey = 1; +---- RESULTS +36 +---- RUNTIME_PROFILE +aggregation(SUM, NumRuntimeFilteredPages): 0 +==== +---- QUERY +################################################### # Join the above sorted column with itself with # overlap filtering on sorted columns disabled. # Expect to see the same result with 0 pages
