Junqing Li created SPARK-51831:
----------------------------------

             Summary: No Column Pruning while using ExistJoin and datasource v2 
                 Key: SPARK-51831
                 URL: https://issues.apache.org/jira/browse/SPARK-51831
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.5, 4.0.0
            Reporter: Junqing Li


Recently, I have been testing TPC-DS queries based on DataSource V2, and 
noticed that column pruning does not occur in scenarios involving {{{}EXISTS 
(SELECT * FROM ... WHERE ...){}}}. As a result, the scan ends up reading all 
columns instead of just the required ones. This issue is reproducible in 
queries like Q10, Q16, Q35, Q69, and Q94.

The following is a simplified example that demonstrates the problem.
{code:java}
test("Test exist join with v2 source plan") {
    import org.apache.spark.sql.functions._
    withTempPath { dir =>
      spark.range(100)
        .withColumn("col1", col("id") + 1)
        .withColumn("col2", col("id") + 2)
        .withColumn("col3", col("id") + 3)
        .withColumn("col4", col("id") + 4)
        .withColumn("col5", col("id") + 5)
        .withColumn("col6", col("id") + 6)
        .withColumn("col7", col("id") + 7)
        .withColumn("col8", col("id") + 8)
        .withColumn("col9", col("id") + 9)
        .write
        .mode("overwrite")
        .parquet(dir.getCanonicalPath + "/t1")
      spark.range(10).write.mode("overwrite").parquet(dir.getCanonicalPath + 
"/t2")      Seq("parquet", "").foreach { v1SourceList =>
        withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key-> v1SourceList) {
          spark.read.parquet(dir.getCanonicalPath + 
"/t1").createOrReplaceTempView("t1")
          spark.read.parquet(dir.getCanonicalPath + 
"/t2").createOrReplaceTempView("t2")
          spark.sql(
            """
              |select sum(t1.id) as sum_id
              |from t1, t2
              |where t1.id == t2.id
              |      and exists(select * from t1 where t1.id == t2.id)
              |""".stripMargin).explain()
        }
      }
    }
  } {code}
After execution, we can observe the following:
 * {*}With DataSource V1{*}, the query plan clearly shows column pruning — only 
one column is read during the scan.

 * 
{code:java}
BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
true]),false), [plan_id=90]
                        +- FileScan parquet [id#32L] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> {code}

 * {*}With DataSource V2{*}, the query plan reveals that no column pruning is 
applied — all columns are read from the underlying data source.

 * 
{code:java}
BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
true]),false), [plan_id=152]
                     +- Project [id#58L]
                        +- BatchScan parquet 
file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0ad680b-573c-4b11-b1b5-f50ee38fa81a/t1[id#58L,
 col1#59L, col2#60L, col3#61L, col4#62L, col5#63L, col6#64L, col7#65L, 
col8#66L, col9#67L] ParquetScan DataFilters: [], Format: parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0...,
 PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: 
[], ReadSchema: 
struct<id:bigint,col1:bigint,col2:bigint,col3:bigint,col4:bigint,col5:bigint,col6:bigint,col7:big...
 RuntimeFilters: []{code}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to