xinyiZzz opened a new issue #6116:
URL: https://github.com/apache/incubator-doris/issues/6116


   # Runtime Filtering for Doris[zh-CN]
   Runtime 
filter的概念、在Doris中的现状、BE的实现思路,见:https://github.com/apache/incubator-doris/issues/6075
   Impala runtime filter的源码解析见: 
https://docs.google.com/document/d/1MbtorqoXqowW5JQZOtJtDs4lsDMsWK-IaviPvM3n0PI/edit
 
   Runtime filter调研及设计思路讨论见: 
https://docs.google.com/document/d/1M--pTDShnGwa5zd-CKLhBdeg8ILgjRo27RVXc-tv9g8/edit
 
   Doris 目前的runtime filter与Impala在设计目标和具体实现上均有不同。
   - 位于FE的runtime filter生成流程参考Impala的实现,主要修改了生成规则、合并策略、配置方法等逻辑;
   - 位于BE的runtime filter构建和应用基于Doris现有逻辑实现,与Impala有较大不同;
   
   ### 名词解释
   (待补充)
   
   ### Runtime Filter 设计目标
   Runtime filter是**Doris 0.14.12**及更高版本提供的优化feature,主要用于优化针对大表的join。
   
   当join只用到表中的一小部分数据时,runtime 
filter在查询运行时解析join条件确定下推的表达式,并将该信息广播给所有正在读取数据的BE节点,通过减少join left child中scan 
node读取的数据量,从而减少join probe的数据量和shuffle网络传输的数据量(仅Shuffle Join),进而避免不必要的I/O和网络传输。
   
   ### Runtime Filter 使用场景
   目前 Runtime Filter 支持的Join类型包括**BoardCast Join、Shuffle Join、Colocation 
Join、Bucket Shuffle Join**。
   
   ### Runtime Filter 类型
   - **`BLOOM_FILTER`**: 有一定的误判率,导致过滤的数据比预期少一点,但不会导致最终结果不准确。
   - **`MIN_MAX`**: 包含最大值和最小值,从而过滤小于最小值和大于最大值的数据,主要用于数字列。
   - **IN**: 下推IN表达式到scan node,默认只有join 
build行数少于1024才会下推(可通过查询选项中的`RUNTIME_FILTER_MAX_IN_NUM`调整),小数据量下性能高。
   
   ### Runtime Filter 内部结构
   Runtime filter由**equi-join predicate**生成,其中predicate的right expr被称为**src 
expr**,用于构建runtime filter,并且可以与join right child中任意数量的tuple ID绑定;left 
expr被称为**target expr**,用于应用runtime filter,并且必须与join left child中某个tuple 
ID绑定。runtime filter中记录着构建runtime filter的join node,和应用runtime filter的scan node。
   
   在plan fragments之间传输的filter**本质是join predicate中key列的值列表**,当这个值列表及时传输到scan 
node时,Doris可以应用它们立即过滤掉不匹配的值,而不是将原始数据传输到join node后进行hash table probe。
   
   Broadcast Join和Shuffle Join的runtime filter构建和应用的行为不同。Broadcast 
Join构建的filter包含join predicate中key列的完整值列表,可以直接交给位于**同一个fragment的join left 
child的scan node**应用;Shuffle Join中每个hash join node构建的filter仅包含集群中这个hash join 
node处理的值,需要将所有hash join node构建的filter**合并**为一个,下推给位于**不同fragment的join left 
child的 scan node**,使用合并后的filter可以准确的过滤从存储中读取的数据,如果filter不合并则scan node扫描时会丢失数据。
   
   需要注意的是如果Broadcast Join的left child是个Shuffle Join,那么这个Broadcast 
Join生成的filter下推到Shuffle Join left child的scan 
node时需要跨fragment,所以仍需合并后下推,即**filter是否需要合并的判断条件是构建filter的join 
node和应用filter的scan node是否位于不同fragment**;
   
   ### Runtime Filter 生成规则及合并策略
   ##### 生成规则
   1. 只支持对binary equality predicates生成runtime filter,不包括Null-safe 
predicates,因为filter可能会过滤掉left child的null值。
   2. 不支持将filter下推到left outer、full outer、anti join的左侧;
   3. 不支持join left child或者right child是常量;
   4. 不支持src expr和target expr相等;
   5. 不支持src expr的类型等于`HLL`或者`BITMAP`;
   6. 不支持target expr包含NULL-checking表达式,包括`COALESCE/IFNULL/CASE`,因为当outer 
join上层其他join的join condition包含NULL-checking表达式,并通过这个表达式生成filter下推到这个outer 
join的左侧时可能导致结果不正确;
   7. 不支持target expr的每个slot无法在某个base table的tuple中找到并绑定某个等价的slot;
   8. 目前仅支持将filter下推给`OlapScanNode`;
   9. 不支持列传导,即不支持推给多个scan node,这包含两种情况,一是例如on clause包含A.k = B.k and B.k = 
C.k,目前C.k只可以推给B.k,而不可以下推给A.k;二是例如on clause包含A.a + B.b = 
C.c,如果A.a可以列传导到B.a,即A.a和B.a是等价的列,那么可以用B.a替换A.a,然后计算B的targetExpr后尝试将filter下推给B;
   10. Target expr和src expr的类型必须相等,因为`BLOOM_FILTER`是基于hash的,若不相等会尝试将target type 
castTo src type;
   11. Target expr的所有slot都必须是key列;
   12. 
不支持`PlanNode.Conjuncts`生成的filter下推,与`HashJoinNode`的`eqJoinConjuncts`和`otherJoinConjuncts`不同,在之前的测试中发现可能会导致错误的结果,例如,当`IN`子查询转换为join时,自动生成的conjunct将保存在
 `PlanNode.conjuncts`中。这时候生成并应用runtime filter可能会导致结果缺少一些行。
   
   ##### 合并策略
   首先遍历每个plan fragment,建立runtime filter ID和fragment instance addr的对应关系,选择plan 
tree最上面的`fragment 0`中的instance作为filter合并节点,Doris中`fragment 
0`有且仅有一个instance,且是query最后一个被销毁的instance。
   
   ### Runtime Filtering 查询选项
   有关控制Runtime Filter的查询选项(session 
variable)请参阅以下部分,除`RUNTIME_FILTER_MODE`和`RUNTIME_FILTER_TYPE`外,其他查询选项用于调整runtime
 filter在特定场景下达到最佳性能,通常只在性能测试后进行调整,从而针对某个查询进行优化。
   
   ##### RUNTIME_FILTER_MODE
   `RUNTIME_FILTER_MODE`查询选项: 用于调整runtime filter的下推策略,包括LOCAL和GLOBAL两种策略。
   
   LOCAL策略下构建的filter只能在同一个instance上后续的plan fragment中使用,即Broadcast 
Join的一般场景;GLOBAL策略下构建的filter除了满足LOCAL策略外,存在Shuffle 
Join的场景下,还可以将filter合并后通过网络传输到不同instance上的plan fragment中使用。具体可见上述对Broadcast 
Join和Shuffle Join应用runtime filter的分析。
   
   
LOCAL策略相对保守,即filter仅在生成它们的同一个instance上使用,GLOBAL策略相对激进,即允许filter通过网络传输。默认设置为GLOBAL策略,这可以在广泛的场景对查询进行有效的优化,而无需显式的设置其他查询选项。
   
   ##### RUNTIME_FILTER_TYPE
   `RUNTIME_FILTER_TYPE` 查询选项: 使用的runtime 
filter类型,包括`BLOOM_FILTER、IN、MIN_MAX`,使用多个时用逗号分隔,注意需要加引号,例如`set 
runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";`。
   
   默认设置为只使用`IN`,目前`IN`没有实现filter合并,即无法跨fragment下推,所以目前当需要将filter下推给Shuffle Join 
left 
child的`OLAP_SCAN_NODE`时,如果没有生成`BLOOM_FILTER`,那么我们会将`IN`转为`BLOOM_FILTER`,用于处理跨fragment下推,所以即使类型只选择了`IN`,实际也可能应用了`BLOOM_FILTER`;
   
   通常情况下使用全部的filter `BLOOM_FILTER、IN、MIN_MAX`时性能更高。
   
   ##### RUNTIME_FILTER_WAIT_TIME_MS
   `RUNTIME_FILTER_WAIT_TIME_MS`查询选项: 以毫秒为单位指定join left child的scan 
node读取数据前等待runtime filter的最长时间,默认会为每个runtime filter最多等待1s(1000ms),即如果scan 
node被分配了3个runtime filter,那么它最多会等待3s,scan node会尝试将等待时间内到达的runtime filter下推到存储引擎。
   
   因为生成和构建runtime filter需要时间,需要进行合并的runtime filter往往需要时间更长。等待超过指定时间后scan 
node会使用已经到达的runtime filter开始读取数据,若没有runtime filter到达则会直接开始读取数据.
   
   如果runtime filter在scan node开始读取数据后到达,则scan node不会将该runtime 
filter下推到存储引擎,而是在scan node上对之后读取到的数据应用该runtime filter进行表达式过滤,但不会将该runtime 
filter应用于已读取的数据,这样得到的中间数据规模会大于最优解,但在部分情况下这样可以避免严重的裂化。
   
   
如果集群比较繁忙,并且集群上有许多资源密集型或长耗时的查询,可以考虑增加等待时间,以避免复杂查询错过优化机会。如果集群负载较轻,并且集群上有许多只需要几秒的小查询,可以考虑减少等待时间,以避免每个查询增加1s的延迟。
   
   ##### RUNTIME_FILTERS_MAX_NUM
   `RUNTIME_FILTERS_MAX_NUM`查询选项: 每个查询可应用的runtime 
filter中`BLOOM_FILTER`的最大数量(`BLOOM_FILTER`的构建和应用代价较高),默认10,如果生成的 
`BLOOM_FILTER`超过最大数量,则保留选择性大的前`RUNTIME_FILTERS_MAX_NUM`个`BLOOM_FILTER`。
   
   选择性=(JoinNode Cardinality / JoinNode left child 
Cardinality)。需要注意的是,因为目前Doris中FE拿到的表的统计信息中数据行数(Cardinality)不准,所以这里`BLOOM_FILTER`计算的选择性与实际不准,因此最终可能只是随机保留了`RUNTIME_FILTERS_MAX_NUM`个`BLOOM_FILTER`。
   
   目前仅对`BLOOM_FILTER`的数量进行限制,因为相比其他filter它的构建和应用代价更高;
   
   ##### RUNTIME_BLOOM_FILTER_SIZE 相关参数
   - `RUNTIME_BLOOM_FILTER_MIN_SIZE`查询选项: runtime 
filter中`BLOOM_FILTER`的最小长度(以字节为单位),默认1048576(1M);
   
   - `RUNTIME_BLOOM_FILTER_MAX_SIZE`查询选项: runtime 
filter中`BLOOM_FILTER`的最大长度(以字节为单位),默认16777216(16M);
   
   - `RUNTIME_BLOOM_FILTER_SIZE` 查询选项: runtime 
filter中`BLOOM_FILTER`的默认长度(以字节为单位),默认2097152(2M);
   
   如果能拿到join right 
child中表的统计信息的数据行数(Cardinality),会尝试将Cardinality作为`NDV`,默认误检率`fpp`等于0.05,计算包含`NDV`个唯一元素且误检率低于`fpp`的bloom
 
filter所需的最小字节数,将四舍五入到最接近的2的幂(以2为底的log值),且通过`RUNTIME_BLOOM_FILTER_MIN_SIZE`和`RUNTIME_BLOOM_FILTER_MAX_SIZE`限制最终使用的bloom
 filter长度。
   
   如果无法拿到join right child的Cardinality,则会使用默认的bloom 
filter长度`RUNTIME_BLOOM_FILTER_SIZE`。
   
   ##### RUNTIME_FILTER_MAX_IN_NUM
   `RUNTIME_FILTER_MAX_IN_NUM` 查询选项: 如果hash join right child的join 
key列的行数大于这个值,我们将忽略`IN` filter,默认1024;
   
   ### runtime filter explain 说明
   EXPLAIN statement可以显示的查询计划中包括每个plan fragment使用的predicates信息,以及plan 
fragment生成和使用runtime filter的注释。
   
   生成runtime filter的plan fragment包含的注释例如 `runtime filters: filter_id[type] <- 
table.column`。
   使用runtime filter的plan fragment包含的注释例如`runtime filters: filter_id[type] -> 
table.column`。
   
   下面例子中的查询使用了一个ID为RF000的runtime filter。
   ``
   CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2 
PROPERTIES("replication_num" = "1");
   INSERT INTO test VALUES (1), (2), (3), (4);
   
   CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2 
PROPERTIES("replication_num" = "1");
   INSERT INTO test2 VALUES (3), (4), (5);
   
   EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
   +-------------------------------------------------------------------+
   | Explain String                                                    |
   +-------------------------------------------------------------------+
   | PLAN FRAGMENT 0                                                   |
   |  OUTPUT EXPRS:`t1`                                                |
   |                                                                   |
   |   4:EXCHANGE                                                      |
   |                                                                   |
   | PLAN FRAGMENT 1                                                   |
   |  OUTPUT EXPRS:                                                    |
   |   PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test`.`t1`  |
   |                                                                   |
   |   2:HASH JOIN                                                     |
   |   |  join op: INNER JOIN (BUCKET_SHUFFLE)                         |
   |   |  equal join conjunct: `test`.`t1` = `test2`.`t2`              |
   |   |  runtime filters: RF000[in] <- `test2`.`t2`                   |
   |   |                                                               |
   |   |----3:EXCHANGE                                                 |
   |   |                                                               |
   |   0:OlapScanNode                                                  |
   |      TABLE: test                                                  |
   |      runtime filters: RF000[in] -> `test`.`t1`                    |
   |                                                                   |
   | PLAN FRAGMENT 2                                                   |
   |  OUTPUT EXPRS:                                                    |
   |   PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test2`.`t2` |
   |                                                                   |
   |   1:OlapScanNode                                                  |
   |      TABLE: test2                                                 |
   +-------------------------------------------------------------------+
   -- 上面`runtime filters`的行显示了`PLAN FRAGMENT 1`的`2:HASH JOIN`生成了ID为RF000的
   -- `IN`类型的filter,`test2`.`t2`的key values仅在运行时可知,在`0:OlapScanNode`
   -- 使用了该filter用于在读取`test`.`t1`时过滤不必要的数据。
   
   SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2; 
   -- 返回2行结果[3, 4];
   
   -- 通过query的profile(set is_report_success=true;)可以查看EXPLAIN plan和查询内部工作的
   -- 详细信息,包括每个filter是否下推、等待耗时、以及OLAP_SCAN_NODE从prepare到接收到filter的总时长。
   RuntimeFilter:in:
       -  HasPushDownToEngine:  true
       -  AWaitTimeCost:  0ns
       -  EffectTimeCost:  2.76ms
   
   -- 此外,在profile的OLAP_SCAN_NODE中还可以查看runtime filter下推后的过滤效果和耗时。
       -  RowsVectorPredFiltered:  9.320008M  (9320008)
       -  VectorPredEvalTime:  364.39ms
   ``   
   
   ### 一些应用Runtime Filter后性能提升的案例
   (待补充)
   
   ### 调优和故障排查
   针对资源密集型、运行耗时足够长且频率足够高的查询,可以参照这部分调优方法去单独优化它们。
   
   1. 针对指定查询,通过EXPLAIN statement查看`runtime filters:`的注释,确认是否将runtime 
filters应用到了期望的join clauses上,例如runtime filters不适用于使用了non-equijoin运算符的查询。
   2. 不同过滤率下runtime filter对性能提升的效果: 
   Runtime 
filter的过滤率越大,即过滤效果越好,对性能提升越大,大多数情况下,只要能过滤部分数据对性能均有一定程度提升,极端情况下过滤率为0%时,即左右表join 
condition的key列取值完全相同时,性能会降低10%左右,所以若预期过滤效果较差,可考虑设置查询选项`RUNTIME_FILTER_MODE=OFF`。
   3. 不同类型的runtime filter同时使用:
   
如果多个filter可过滤的数据互相包含,此时性能比仅使用一种filter要差,即一种filter先过滤后,其他类型的filter实际没起作用(或过滤很少),带来额外开销;
   4. Filter下推对性能的影响: 
   
目前`MIN_MAX`和`IN`均可下推至存储引擎,`BLOOM_FILTER`仅可对Key列下推至存储引擎,实际测试中将filter下推到存储引擎相比在scan
 
node上使用表达式计算对性能的提升大很多,且`BLOOM_FILTER`不下推时大多数情况会降低性能,所以对非Key列使用`BLOOM_FILTER`可能效果不尽如人意。
   5. `BLOOM_FILTER`不同长度对性能的影响: 
   当join right 
child数据量大导致`BLOOM_FILTER`存在较多假阳性时,随着`BLOOM_FILTER`长度增大,过滤率随之增大,同时`BLOOM_FILTER`构建和合并的时间变长,部分case中bloom
 filte长度为2M时性能最好;
   6. 大表join keys包含unique列时:
   如果涉及大表的join查询使用了unique列作为join keys,则生成和传输filter的开销可能超过其性能收益,而且key 
values太多会导致`BLOOM_FILTER`的误检率过大,从而无法有效减少left 
child读取的数据量,对于此类查询,可以考虑设置查询选项`RUNTIME_FILTER_MODE=OFF`。
   7. Join keys基数大小不同对runtime filter的影响: 
   Inner Join中,join keys基数越小,即count(distinct join keys)越小,runtime 
filter对性能提升越大,这是因为Doris中hash table采用链表法处理hash冲突,当join 
keys基数小时hash冲突严重,此时runtime filter减少join probe的数据量对性能提升很大;
   8. 是否等待runtime filter性能对比: 
实际测试中等待filter到达并下推到存储引擎后再开始Scan,比直接Scan后等filter到达后再使用(无法下推到存储引擎)的性能要好。如果等待runtime
 filter的生成需要几秒,但是使用runtime filter修剪掉不必要的数据后可能可以节省几分钟,那么额外等待几秒钟也是值得的。
   9. Runtime filter在Broadcast Join、Shuffle Join、Bucket Shuffle Join上性能提升的效果对比:
       - 部分情况下runtime filter在Shuffle Join上的性能提升高于Broadcast 
Join,原因是构建filter耗时的影响,Broadcast Join中每个BE都要用右表所有数据构建filter;Shuffle 
Join中每个BE只需用右表中1/N的数据构建,filter合并带来的网络耗时通常可忽略不计,所以构建速度更快;
       - 部分情况下Bucket Shuflle Join相比Broadcast Join和Shuffle 
Join在`BLOOM_FILTER`长度相同时过滤率更高。原因是Broadcast Join和Shuffle Join每个scan 
node使用的`BLOOM_FILTER`包含右表的所有数据,而Bucket Shuflle Join减少了右表shuffle的数据量,每个scan 
node使用的`BLOOM_FILTER`只包含右表N分之一的数据,所以`BLOOM_FILTER`假阳性率更低;
       - 但在某些case下,runtime filter在Shuffle Join上的效果要好于Bucket Shuflle 
Join,原因是Bucket Shuflle Join存在数据倾斜;
   10. Join keys类型不同对runtime filter的影响: 
   大多数情况下,当join keys类型为int/bigint/double等时,`MIN_MAX` 
filter的过滤效果和左右表数据分布有关,当join keys类型为varchar时`MIN_MAX` 
filter过滤效果差,`BLOOM_FILTER`受join keys类型影响小;
   
   ### TODO
   1. 支持列传导,具体见生成规则中对列传导的说明;
   2. 
支持`PlanNode.Conjuncts`生成的filter下推。需要确认2个问题,`PlanNode.Conjuncts`生成的filter是否要限制join类型;都有哪些情况下会生成`PlanNode.Conjuncts`,目前已知`in`子查询转换为join时生成;
   3. 
支持FE获取数据表准确的cardinality。因为需要保证每个BE的`BLOOM_FILTER`长度相同才能合并,所以目前在FE计算`BLOOM_FILTER`的长度,但FE拿到的cardinality不准,所以通过join
 right child的Cardinality和`fpp`计算的bloom 
filter长度不准。而且受数据倾斜影响,统一`BLOOM_FILTER`长度也可能存在问题;
   4. 增加是否仅允许Key列生成runtime filter的开关,因为`BLOOM_FILTER`只有Key列才能下推;
   5. `BLOOM_FILTER`下推后若误检率过高,增加Always True的逻辑,即不继续使用`BLOOM_FILTER`;
   6. 当join probe(left) side数据量太小时不生成runtime filter,当join build(right) 
side数据量太大时不生成RuntimeFilter;
   
   ### 一些测试结论
   1. Runtime Filter SSB单机性能测试: 
https://docs.google.com/document/d/1Jc48T4aKxFv_QgFJjsQwVgkXuh9Re4kiAhEWBS8R1MI/edit?usp=sharing
   2. Runtime Filter TPC-DS测试: (待补充)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to