SQL query planner can have intelligence to push down filter commands towards the storage layer. If we optimize the query planner such that the IO to the storage is reduced at the cost of running multiple filters (i.e., compute), this should be desirable when the system is IO bound. An example to prove the case in point is below from TPCH test bench:
Let’s look at query q19 of TPCH test bench. select sum(l_extendedprice* (1 - l_discount)) as revenue from lineitem, part where ( p_partkey = l_partkey and p_brand = 'Brand#12' and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') and l_quantity >= 1 and l_quantity <= 1 + 10 and p_size between 1 and 5 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON') or ( p_partkey = l_partkey and p_brand = 'Brand#23' and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') and l_quantity >= 10 and l_quantity <= 10 + 10 and p_size between 1 and 10 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON') or ( p_partkey = l_partkey and p_brand = 'Brand#34' and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') and l_quantity >= 20 and l_quantity <= 20 + 10 and p_size between 1 and 15 and l_shipmode in ('AIR', 'AIR REG') and l_shipinstruct = 'DELIVER IN PERSON') Latest version of Spark creates a following planner (not exactly, more readable planner) to execute q19. Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount)) Project [l_extendedprice,l_discount] Join Inner, Some(((p_partkey = l_partkey) && (((((( (p_brand = Brand#12) && p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (l_quantity >= 1.0)) && (l_quantity <= 11.0)) && (p_size <= 5)) || (((((p_brand = Brand#23) && p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity >= 10.0)) && (l_quantity <= 20.0)) && (p_size <= 10))) || (((((p_brand = Brand#34) && p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (l_quantity >= 20.0)) && (l_quantity <= 30.0)) && (p_size <= 15))))) Project [l_partkey, l_quantity, l_extendedprice, l_discount] Filter ((isnotnull(l_partkey) && (isnotnull(l_shipinstruct) && (l_shipmode IN (AIR,AIR REG) && (l_shipinstruct = DELIVER IN PERSON)))) LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment], MapPartitionsRDD[316] Project [p_partkey, p_brand, p_size, p_container] Filter ((isnotnull(p_partkey) && (isnotnull(p_size) && (cast(cast(p_size as decimal(20,0)) as int) >= 1))) LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment], MapPartitionsRDD[314] As you see only three filter commands are pushed before join process is executed. l_shipmode IN (AIR,AIR REG) l_shipinstruct = DELIVER IN PERSON (cast(cast(p_size as decimal(20,0)) as int) >= 1) And the following filters are applied during the join process p_brand = Brand#12 p_container IN (SM CASE,SM BOX,SM PACK,SM PKG) l_quantity >= 1.0 && l_quantity <= 11.0 p_size <= 5 p_brand = Brand#23 p_container IN (MED BAG,MED BOX,MED PKG,MED PACK) l_quantity >= 10.0 && l_quantity <= 20.0 p_size <= 10 p_brand = Brand#34 p_container IN (LG CASE,LG BOX,LG PACK,LG PKG) l_quantity >= 20.0 && l_quantity <= 30.0 p_size <= 15 Let’s look at the following sequence of SQL commands which produce same result. val partDfFilter = sqlContext.sql(""" |select p_brand, p_partkey from part |where | (p_brand = 'Brand#12' | and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') | and p_size between 1 and 5) | or | (p_brand = 'Brand#23' | and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') | and p_size between 1 and 10) | or | (p_brand = 'Brand#34' | and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') | and p_size between 1 and 15) """.stripMargin) val itemLineDfFilter = sqlContext.sql(""" |select | l_partkey, l_quantity, l_extendedprice, l_discount from lineitem |where | (l_quantity >= 1 and l_quantity <= 30 | and l_shipmode in ('AIR', 'AIR REG') | and l_shipinstruct = 'DELIVER IN PERSON') """.stripMargin) partDfFilter.registerTempTable("partFilter") itemLineDfFilter.registerTempTable("lineitemFilter") var q19Query = """ |select | sum(l_extendedprice* (1 - l_discount)) as revenue |from | lineitemFilter, | partFilter |where | (p_partkey = l_partkey | and p_brand = 'Brand#12' | and l_quantity >= 1 and l_quantity <= 1 + 10) | or | ( p_partkey = l_partkey | and p_brand = 'Brand#23' | and l_quantity >= 10 and l_quantity <= 10 + 10) | or | ( p_partkey = l_partkey | and p_brand = 'Brand#34' | and l_quantity >= 20 and l_quantity <= 20 + 10) """.stripMargin And as following planner shows how spark will execute new q19 query. Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount)) Project [l_extendedprice,l_discount] Join Inner, Some(((p_partkey = l_partkey) && (((((p_brand = Brand#12) && (l_quantity >= 1.0)) && (l_quantity <= 11.0)) || (((p_brand = Brand#23) && (l_quantity >= 10.0)) && (l_quantity <= 20.0))) || (((p_brand = Brand#34) && (l_quantity >= 20.0)) && (l_quantity <= 30.0))))) Project [l_partkey, l_quantity, l_extendedprice, l_discount] Filter ((isnotnull(l_partkey) && ((isnotnull(l_shipinstruct) && isnotnull(l_quantity)) && (((cast(l_quantity as float) >= 1.0) && (cast(l_quantity as float) <= 30.0)) && (l_shipmode IN (AIR,AIR REG) && (l_shipinstruct = DELIVER IN PERSON))))) LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment], MapPartitionsRDD[316] Project [p_partkey, p_brand, p_size, p_container] Filter ((isnotnull(p_partkey) && isnotnull(cast(cast(p_partkey as decimal(20,0)) as int))) && (isnotnull(p_size) && ((cast(cast(p_size as decimal(20,0)) as int) >= 1) && (((((p_brand = Brand#12) && p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) && (cast(cast(p_size as decimal(20,0)) as int) <= 5)) || (((p_brand = Brand#23) && p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) && (cast(cast(p_size as decimal(20,0)) as int) <= 10))) || (((p_brand = Brand#34) && p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (cast(cast(p_size as decimal(20,0)) as int) <= 15)))))) LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment], MapPartitionsRDD[314] With new approach all filter commands is pushed down beyond join process l_shipmode IN (AIR,AIR REG) l_shipinstruct = DELIVER IN PERSON cast(cast(p_size as decimal(20,0)) as int) >= 1) p_brand = Brand#12 p_container IN (SM CASE,SM BOX,SM PACK,SM PKG) l_quantity >= 1.0 && l_quantity <= 11.0 p_size <= 5 p_brand = Brand#23 p_container IN (MED BAG,MED BOX,MED PKG,MED PACK) l_quantity >= 10.0 && l_quantity <= 20.0 p_size <= 10 p_brand = Brand#34 p_container IN (LG CASE,LG BOX,LG PACK,LG PKG) l_quantity >= 20.0 && l_quantity <= 30.0 p_size <= 15 But still some filter commands needs to be executed during join process to distinguish different sets of items. In other words some filter commands are re-evaluated. p_brand = Brand#12 l_quantity >= 1.0 && l_quantity <= 11.0 p_brand = Brand#23 l_quantity >= 10.0 && l_quantity <= 20.0 p_brand = Brand#34 l_quantity#807 >= 20.0 && l_quantity#807 <= 30.0 Our main goal to push down filter as much as possible is to minimize I/O and maximize processor utilization. So let’s compare result of original q19 and modified q19 from I/O point of view. +--------+--------+---------------------------------------------+--------------------------------------------+ | TPCH | Stage | Q19 | Q19 modified | | Scale | +----------+---------------+----------------+----------+----------------+---------------+ | Factor | | Input | Shuffle Read | Shuffle Write | Input | Shuffle Read | Shuffle Write | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 1 | 1 | 724 MB | | 4.2 MB | 724 MB | | 2.7 MB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 1 | 2 | 23.0 MB | | 4.0 MB | 23.0 MB | | 22.9 KB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 1 | 3 | | 8.2 MB | 11.0 KB | | 2.7 MB | 11.0 KB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 1 | 4 | | 11.0 KB | | | 11.0 KB | | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 10 | 1 | 7.2 GB | | 43.5 MB | 7.2 GB | | 28.0 MB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 10 | 2 | 232 MB | | 39.1 MB | 232 MB | | 146.2 KB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 10 | 3 | | 82.5 MB | 11.0 KB | | 28.1 MB | 11.0 KB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 10 | 4 | | 11.0 KB | | | 11.0 KB | | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 100 | 1 | 74.1 GB | | 448 MB | 74.1 GB | | 266 MB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 100 | 2 | 2.3 GB | | 385 MB | 2.3 GB | | 1570 KB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 100 | 3 | | 834 MB | 11.0 KB | | 288 MB | 11.0 KB | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ | 100 | 4 | | 11.0 KB | | | 11.0 KB | | +--------+--------+----------+---------------+----------------+----------+----------------+---------------+ As rate of read and write amplification reduction for each scale factor is shown in the following table. +--------------------+--------------------------+------------------------------+--------+ | TPCH Scale Facto | Q19 Shuffle Data | Q19 Modified Shuffle Data | Rate | +--------------------+--------------------------+------------------------------+--------+ | 1 | 8.211 MB | 2.733 MB | 3.00 | +--------------------+--------------------------+------------------------------+--------+ | 10 | 82.611 MB | 28.157 MB | 2.93 | +--------------------+--------------------------+------------------------------+--------+ | 100 | 834.311 MB | 288.081 MB | 2.89 | +--------------------+--------------------------+------------------------------+--------+ So as you see shuffle read and write amplification can be reduced by factor of 3 if we can push more intelligent toward of storage. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Reduce-Shuffle-Data-by-pushing-filter-toward-storage-tp17297.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org