Junqing Li created SPARK-51831: ---------------------------------- Summary: No Column Pruning while using ExistJoin and datasource v2 Key: SPARK-51831 URL: https://issues.apache.org/jira/browse/SPARK-51831 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.5, 4.0.0 Reporter: Junqing Li
Recently, I have been testing TPC-DS queries based on DataSource V2, and noticed that column pruning does not occur in scenarios involving {{{}EXISTS (SELECT * FROM ... WHERE ...){}}}. As a result, the scan ends up reading all columns instead of just the required ones. This issue is reproducible in queries like Q10, Q16, Q35, Q69, and Q94. The following is a simplified example that demonstrates the problem. {code:java} test("Test exist join with v2 source plan") { import org.apache.spark.sql.functions._ withTempPath { dir => spark.range(100) .withColumn("col1", col("id") + 1) .withColumn("col2", col("id") + 2) .withColumn("col3", col("id") + 3) .withColumn("col4", col("id") + 4) .withColumn("col5", col("id") + 5) .withColumn("col6", col("id") + 6) .withColumn("col7", col("id") + 7) .withColumn("col8", col("id") + 8) .withColumn("col9", col("id") + 9) .write .mode("overwrite") .parquet(dir.getCanonicalPath + "/t1") spark.range(10).write.mode("overwrite").parquet(dir.getCanonicalPath + "/t2") Seq("parquet", "").foreach { v1SourceList => withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key-> v1SourceList) { spark.read.parquet(dir.getCanonicalPath + "/t1").createOrReplaceTempView("t1") spark.read.parquet(dir.getCanonicalPath + "/t2").createOrReplaceTempView("t2") spark.sql( """ |select sum(t1.id) as sum_id |from t1, t2 |where t1.id == t2.id | and exists(select * from t1 where t1.id == t2.id) |""".stripMargin).explain() } } } } {code} After execution, we can observe the following: * {*}With DataSource V1{*}, the query plan clearly shows column pruning — only one column is read during the scan. * {code:java} BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=90] +- FileScan parquet [id#32L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> {code} * {*}With DataSource V2{*}, the query plan reveals that no column pruning is applied — all columns are read from the underlying data source. * {code:java} BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=152] +- Project [id#58L] +- BatchScan parquet file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0ad680b-573c-4b11-b1b5-f50ee38fa81a/t1[id#58L, col1#59L, col2#60L, col3#61L, col4#62L, col5#63L, col6#64L, col7#65L, col8#66L, col9#67L] ParquetScan DataFilters: [], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0..., PartitionFilters: [], PushedAggregation: [], PushedFilters: [], PushedGroupBy: [], ReadSchema: struct<id:bigint,col1:bigint,col2:bigint,col3:bigint,col4:bigint,col5:bigint,col6:bigint,col7:big... RuntimeFilters: []{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org