This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new e2e45401e IMPALA-13129: Move runtime filter skipping at 
registerRuntimeFilter
e2e45401e is described below

commit e2e45401e2bead4090fd5c562709db521cbc6d38
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Jun 3 19:58:54 2024 -0700

    IMPALA-13129: Move runtime filter skipping at registerRuntimeFilter
    
    A DCHECK in hdfs-scanner.h was hit when skipping a MIN_MAX runtime
    filter using RUNTIME_FILTER_IDS_TO_SKIP query option. This is because
    HdfsScanNode.tryToComputeOverlapPredicate() is called and register a
    TOverlapPredicateDesc during runtime filter generation, but the minmax
    filter is then skipped later, causing backend to hit DCHECK.
    
    This patch move the runtime filter skipping at registerRuntimeFilter()
    so that HdfsScanNode.tryToComputeOverlapPredicate() will not be called
    at all once a filter is skipped.
    
    Testing:
    - Add test in overlap_min_max_filters.test to explicitly skip a minmax
      runtime filter.
    - Pass test_runtime_filters.py
    
    Change-Id: I43c1c4abc88019aadaa85d2e3d0ecda417297bfc
    Reviewed-on: http://gerrit.cloudera.org:8080/21477
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hdfs-scanner.h                         |  2 +-
 .../impala/planner/RuntimeFilterGenerator.java     | 48 ++++++++++------------
 .../queries/QueryTest/overlap_min_max_filters.test | 18 ++++++++
 3 files changed, 40 insertions(+), 28 deletions(-)

diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 1d277a300..b1ab913ee 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -196,7 +196,7 @@ class HdfsScanner {
         return i;
       }
     }
-    DCHECK(false);
+    DCHECK(false) << "Runtime filter id " << filter_id << " not found!";
     return -1;
   }
 
diff --git 
a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java 
b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index c908e17d3..782d68581 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -778,9 +778,9 @@ public final class RuntimeFilterGenerator {
     /**
      * Return true if this filter is deemed highly selective, will arrive 
on-time, and
      * applied for all rows without ever being disabled by the scanner node.
-     * Mainly used by {@link 
ScanNode#reduceCardinalityByRuntimeFilter(java.util.Stack)}
-     * to lower the scan cardinality estimation. These properties are hard to 
predict
-     * during planning, so this method is set very conservative to avoid severe
+     * Mainly used by {@link 
ScanNode#reduceCardinalityByRuntimeFilter(java.util.Stack,
+     * double)} to lower the scan cardinality estimation. These properties are 
hard to
+     * predict during planning, so this method is set very conservative to 
avoid severe
      * underestimation.
      */
     public boolean isHighlySelective() {
@@ -888,8 +888,7 @@ public final class RuntimeFilterGenerator {
     RuntimeFilterGenerator filterGenerator = new RuntimeFilterGenerator(
         ctx.getQueryOptions());
     filterGenerator.generateFilters(ctx, plan);
-    List<RuntimeFilter> filters =
-        Lists.newArrayList(filterGenerator.getRuntimeFilters(ctx));
+    List<RuntimeFilter> filters = 
Lists.newArrayList(filterGenerator.getRuntimeFilters());
     if (filters.size() > maxNumBloomFilters) {
       // If more than 'maxNumBloomFilters' were generated, sort them by 
increasing
       // selectivity and keep the 'maxNumBloomFilters' most selective bloom 
filters.
@@ -1036,18 +1035,10 @@ public final class RuntimeFilterGenerator {
   /**
    * Returns a list of all the registered runtime filters, ordered by filter 
ID.
    */
-  public List<RuntimeFilter> getRuntimeFilters(PlannerContext ctx) {
+  public List<RuntimeFilter> getRuntimeFilters() {
     Set<RuntimeFilter> resultSet = new HashSet<>();
     for (List<RuntimeFilter> filters: runtimeFiltersByTid_.values()) {
-      for (RuntimeFilter filter : filters) {
-        if (ctx.getQueryOptions().isSetRuntime_filter_ids_to_skip()
-            && ctx.getQueryOptions().runtime_filter_ids_to_skip.contains(
-                filter.getFilterId().asInt())) {
-          // Skip this filter because it is explicitly excluded via query 
option.
-          continue;
-        }
-        resultSet.add(filter);
-      }
+      resultSet.addAll(filters);
     }
     List<RuntimeFilter> resultList = Lists.newArrayList(resultSet);
     Collections.sort(resultList, new Comparator<RuntimeFilter>() {
@@ -1112,7 +1103,7 @@ public final class RuntimeFilterGenerator {
               ctx.getRootAnalyzer(), conjunct, joinNode, filterType, 
filterSizeLimits_,
               /* isTimestampTruncation */ false, level);
           if (filter != null) {
-            registerRuntimeFilter(filter);
+            registerRuntimeFilter(ctx, filter);
             filters.add(filter);
           }
           // For timestamp bloom filters, we also generate a RuntimeFilter 
with the
@@ -1125,7 +1116,7 @@ public final class RuntimeFilterGenerator {
                 ctx.getRootAnalyzer(), conjunct, joinNode, filterType, 
filterSizeLimits_,
                 /* isTimestampTruncation */ true, level);
             if (filter2 == null) continue;
-            registerRuntimeFilter(filter2);
+            registerRuntimeFilter(ctx, filter2);
             filters.add(filter2);
           }
         }
@@ -1149,26 +1140,29 @@ public final class RuntimeFilterGenerator {
    * Registers a runtime filter with the tuple id of every scan node that is a 
candidate
    * destination node for that filter.
    */
-  private void registerRuntimeFilter(RuntimeFilter filter) {
+  private void registerRuntimeFilter(PlannerContext ctx, RuntimeFilter filter) 
{
     Map<TupleId, List<SlotId>> targetSlotsByTid = filter.getTargetSlots();
     Preconditions.checkState(targetSlotsByTid != null && 
!targetSlotsByTid.isEmpty());
     for (TupleId tupleId: targetSlotsByTid.keySet()) {
-      registerRuntimeFilter(filter, tupleId);
+      registerRuntimeFilter(ctx, filter, tupleId);
     }
   }
 
   /**
    * Registers a runtime filter with a specific target tuple id.
    */
-  private void registerRuntimeFilter(RuntimeFilter filter, TupleId targetTid) {
-    Preconditions.checkState(filter.getTargetSlots().containsKey(targetTid));
-    List<RuntimeFilter> filters = runtimeFiltersByTid_.get(targetTid);
-    if (filters == null) {
-      filters = new ArrayList<>();
-      runtimeFiltersByTid_.put(targetTid, filters);
+  private void registerRuntimeFilter(
+      PlannerContext ctx, RuntimeFilter filter, TupleId targetTid) {
+    Preconditions.checkArgument(filter.getTargetSlots().containsKey(targetTid),
+        "filter does not contain target slot!");
+    Preconditions.checkArgument(!filter.isFinalized(), "filter must not 
finalized yet!");
+    if (ctx.getQueryOptions().isSetRuntime_filter_ids_to_skip()
+        && ctx.getQueryOptions().getRuntime_filter_ids_to_skip().contains(
+            filter.getFilterId().asInt())) {
+      // Skip this filter because it is explicitly excluded via query option.
+      return;
     }
-    Preconditions.checkState(!filter.isFinalized());
-    filters.add(filter);
+    runtimeFiltersByTid_.computeIfAbsent(targetTid, t -> new 
ArrayList<>()).add(filter);
   }
 
   /**
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
 
b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
index d450bb1d0..bfbf19ac4 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/overlap_min_max_filters.test
@@ -185,6 +185,24 @@ aggregation(SUM, NumRuntimeFilteredPages): 255
 ====
 ---- QUERY
 ###################################################
+# Test that explicitly skipping minmax runtime filter does not crash backend.
+# Expect to see a total of 0 pages filtered out.
+###################################################
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+SET MINMAX_FILTER_THRESHOLD=1.0;
+SET MINMAX_FILTERING_LEVEL=PAGE;
+SET RUNTIME_FILTER_IDS_TO_SKIP=1;
+select straight_join count(*)
+from lineitem_sorted_l_extendedprice a join [SHUFFLE]
+lineitem_sorted_l_extendedprice b
+where a.l_extendedprice = b.l_extendedprice and b.l_orderkey = 1;
+---- RESULTS
+36
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRuntimeFilteredPages): 0
+====
+---- QUERY
+###################################################
 # Join the above sorted column with itself with
 # overlap filtering on sorted columns disabled.
 # Expect to see the same result with 0 pages

Reply via email to