[ 
https://issues.apache.org/jira/browse/SPARK-50873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KeKe Zhu updated SPARK-50873:
-----------------------------
    Description: 
I used Spark 3.5+iceberg 1.6.1 to run TPCDS test. When doing performance 
analysis, I found that there is a potential optimization for SparkOptimizer.

The optimiztion is about column pruning of DatasourceV2 (DSV2). 
In SparkOptimizer, the column pruning of DSV2 is executed in 
V2ScanRelationPushDown rule. However, there is a series of optimiztion rules 
after V2ScanRelationPushDown, those optimization rule may rewrite subquery and 
generate Project or Filter operator that can be used for column pruning, but 
column pruning will not be execute again, resulting in the generated physical 
plan reading the entire table instead of only reading the required columns.

For example,there is the query 16 in TPCDS:
{code:java}
set spark.queryID=query16.tpl;
select
   count(distinct cs_order_number) as `order count`
  ,sum(cs_ext_ship_cost) as `total shipping cost`
  ,sum(cs_net_profit) as `total net profit`
from
   catalog_sales cs1
  ,date_dim
  ,customer_address
  ,call_center
where
    d_date between '2002-2-01' and
           (cast('2002-2-01' as date) + interval 60 days)
and cs1.cs_ship_date_sk = d_date_sk
and cs1.cs_ship_addr_sk = ca_address_sk
and ca_state = 'KS'
and cs1.cs_call_center_sk = cc_call_center_sk
and cc_county in ('Daviess County','Barrow County','Walker County','San Miguel 
County',
                  'Mobile County'
)
and exists (select *
            from catalog_sales cs2
            where cs1.cs_order_number = cs2.cs_order_number
              and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
and not exists(select *
               from catalog_returns cr1
               where cs1.cs_order_number = cr1.cr_order_number)
order by count(distinct cs_order_number)
limit 100; {code}
The final Optimized Plan of the query is as below picture, we can see that 
there are two talbes (catalog_sale & catalog_returns) are readed all data and 
do project,which certainly cause low performance for iceberg. 

!query16-org.PNG!

 

 

My current solution:  I write an optimiztion rule and  add it to the 
SparkOptimizer, now i get the expect optimized plan and get a much better 
performance result.

!query16-opt.PNG!

I want to know is there any other solution for this problem? contact me anytime.

  was:
I used Spark 3.5+iceberg 1.6.1 to run TPCDS test. When doing performance 
analysis, I found that there is a potential optimization for SparkOptimizer.

The optimiztion is about column pruning of DatasourceV2 (DSV2). 
In SparkOptimizer, the column pruning of DSV2 is executed in 
V2ScanRelationPushDown rule. However, there is a series of optimiztion rules 
after V2ScanRelationPushDown, those optimization rule may rewrite subquery and 
generate Project or Filter operator that can be used for column pruning, but 
column pruning will not be execute again, resulting in the generated physical 
plan reading the entire table instead of only reading the required columns.

For example,there is the query 16 in TPCDS:
{code:java}
set spark.queryID=query16.tpl;
select
   count(distinct cs_order_number) as `order count`
  ,sum(cs_ext_ship_cost) as `total shipping cost`
  ,sum(cs_net_profit) as `total net profit`
from
   catalog_sales cs1
  ,date_dim
  ,customer_address
  ,call_center
where
    d_date between '2002-2-01' and
           (cast('2002-2-01' as date) + interval 60 days)
and cs1.cs_ship_date_sk = d_date_sk
and cs1.cs_ship_addr_sk = ca_address_sk
and ca_state = 'KS'
and cs1.cs_call_center_sk = cc_call_center_sk
and cc_county in ('Daviess County','Barrow County','Walker County','San Miguel 
County',
                  'Mobile County'
)
and exists (select *
            from catalog_sales cs2
            where cs1.cs_order_number = cs2.cs_order_number
              and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
and not exists(select *
               from catalog_returns cr1
               where cs1.cs_order_number = cr1.cr_order_number)
order by count(distinct cs_order_number)
limit 100; {code}
The final Optimized Plan of the query is as below picture, we can see that 
there are two talbes (catalog_sale & catalog_returns) are readed all data and 
do project,which certainly cause low performance for iceberg. 

!query16-org.PNG!

 

 

My current solution:  I write an optimiztion rule and  add it to the 
SparkOptimizer, now i get the expect optimized plan and get a much better 
performance result.

!query16-opt.PNG!

I want to know is there any other solution for this problem? contate me anytime.


> An optimization for SparkOptimizer to prune the column in subquery
> ------------------------------------------------------------------
>
>                 Key: SPARK-50873
>                 URL: https://issues.apache.org/jira/browse/SPARK-50873
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.5.3
>            Reporter: KeKe Zhu
>            Priority: Major
>         Attachments: query16-opt.PNG, query16-org.PNG
>
>
> I used Spark 3.5+iceberg 1.6.1 to run TPCDS test. When doing performance 
> analysis, I found that there is a potential optimization for SparkOptimizer.
> The optimiztion is about column pruning of DatasourceV2 (DSV2). 
> In SparkOptimizer, the column pruning of DSV2 is executed in 
> V2ScanRelationPushDown rule. However, there is a series of optimiztion rules 
> after V2ScanRelationPushDown, those optimization rule may rewrite subquery 
> and generate Project or Filter operator that can be used for column pruning, 
> but column pruning will not be execute again, resulting in the generated 
> physical plan reading the entire table instead of only reading the required 
> columns.
> For example,there is the query 16 in TPCDS:
> {code:java}
> set spark.queryID=query16.tpl;
> select
>    count(distinct cs_order_number) as `order count`
>   ,sum(cs_ext_ship_cost) as `total shipping cost`
>   ,sum(cs_net_profit) as `total net profit`
> from
>    catalog_sales cs1
>   ,date_dim
>   ,customer_address
>   ,call_center
> where
>     d_date between '2002-2-01' and
>            (cast('2002-2-01' as date) + interval 60 days)
> and cs1.cs_ship_date_sk = d_date_sk
> and cs1.cs_ship_addr_sk = ca_address_sk
> and ca_state = 'KS'
> and cs1.cs_call_center_sk = cc_call_center_sk
> and cc_county in ('Daviess County','Barrow County','Walker County','San 
> Miguel County',
>                   'Mobile County'
> )
> and exists (select *
>             from catalog_sales cs2
>             where cs1.cs_order_number = cs2.cs_order_number
>               and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
> and not exists(select *
>                from catalog_returns cr1
>                where cs1.cs_order_number = cr1.cr_order_number)
> order by count(distinct cs_order_number)
> limit 100; {code}
> The final Optimized Plan of the query is as below picture, we can see that 
> there are two talbes (catalog_sale & catalog_returns) are readed all data and 
> do project,which certainly cause low performance for iceberg. 
> !query16-org.PNG!
>  
>  
> My current solution:  I write an optimiztion rule and  add it to the 
> SparkOptimizer, now i get the expect optimized plan and get a much better 
> performance result.
> !query16-opt.PNG!
> I want to know is there any other solution for this problem? contact me 
> anytime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to