PetarVasiljevic-DB commented on code in PR #50921: URL: https://github.com/apache/spark/pull/50921#discussion_r2192878985
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ########## @@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) } + def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp { + // Join can be attempted to be pushed down only if left and right side of join are + // compatible (same data source, for example). Also, another requirement is that if + // there are projections between Join and ScanBuilderHolder, these projections need to be + // AttributeReferences. We could probably support Alias as well, but this should be on + // TODO list. + // Alias can exist between Join and sHolder node because the query below is not valid: + // SELECT * FROM + // (SELECT * FROM tbl t1 JOIN tbl2 t2) p + // JOIN + // (SELECT * FROM tbl t3 JOIN tbl3 t4) q + // ON p.t1.col = q.t3.col (this is not possible) Review Comment: No, this is Spark query. If you want the above query to work, you need to assign aliases in Spark query. Otherwise, you have no way to reference different columns because `p.t1.col` is not valid ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ########## @@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) } + def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp { + // Join can be attempted to be pushed down only if left and right side of join are + // compatible (same data source, for example). Also, another requirement is that if + // there are projections between Join and ScanBuilderHolder, these projections need to be + // AttributeReferences. We could probably support Alias as well, but this should be on + // TODO list. + // Alias can exist between Join and sHolder node because the query below is not valid: + // SELECT * FROM + // (SELECT * FROM tbl t1 JOIN tbl2 t2) p + // JOIN + // (SELECT * FROM tbl t3 JOIN tbl3 t4) q + // ON p.t1.col = q.t3.col (this is not possible) + // It's because there are 2 same tables in both sides of top level join and it not possible Review Comment: Yes, of course. ########## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ########## @@ -265,6 +266,288 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel super.afterAll() } + test("Test 2-way join without condition - no join pushdown") { + val sqlQuery = "SELECT * FROM h2.test.employee a, h2.test.employee b" + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + assert(joinNodes.nonEmpty) + checkAnswer(df, rows) + } + } + + test("Test multi-way join without condition - no join pushdown") { + val sqlQuery = """ + |SELECT * FROM + |h2.test.employee a, + |h2.test.employee b, + |h2.test.employee c + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + assert(joinNodes.nonEmpty) + checkAnswer(df, rows) + } + } + + test("Test self join with condition") { + val sqlQuery = "SELECT * FROM h2.test.employee a JOIN h2.test.employee b ON a.dept = b.dept + 1" + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + assert(joinNodes.isEmpty) + checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]") + checkAnswer(df, rows) + } + } + + test("Test multi-way self join with conditions") { + val sqlQuery = """ + |SELECT * FROM + |h2.test.employee a + |JOIN h2.test.employee b ON b.dept = a.dept + 1 + |JOIN h2.test.employee c ON c.dept = b.dept - 1 + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + assert(!rows.isEmpty) + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + assert(joinNodes.isEmpty) + checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, h2.test.employee]") + checkAnswer(df, rows) + } + } + + test("Test self join with column pruning") { + val sqlQuery = """ + |SELECT a.dept + 2, b.dept, b.salary FROM + |h2.test.employee a JOIN h2.test.employee b + |ON a.dept = b.dept + 1 + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + assert(joinNodes.isEmpty) + checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]") + checkAnswer(df, rows) + } + } + + test("Test 2-way join with column pruning - different tables") { + val sqlQuery = """ + |SELECT * FROM + |h2.test.employee a JOIN h2.test.people b + |ON a.dept = b.id + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + assert(joinNodes.isEmpty) + checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.people]") + checkPushedInfo(df, + "PushedFilters: [DEPT IS NOT NULL, ID IS NOT NULL, DEPT = ID]") + checkAnswer(df, rows) + } + } + + test("Test multi-way self join with column pruning") { + val sqlQuery = """ + |SELECT a.dept, b.*, c.dept, c.salary + a.salary + |FROM h2.test.employee a + |JOIN h2.test.employee b ON b.dept = a.dept + 1 + |JOIN h2.test.employee c ON c.dept = b.dept - 1 + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + assert(joinNodes.isEmpty) + checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, h2.test.employee]") + checkAnswer(df, rows) + } + } + + test("Test aliases not supported in join pushdown") { + val sqlQuery = """ + |SELECT a.dept, bc.* + |FROM h2.test.employee a + |JOIN ( + | SELECT b.*, c.dept AS c_dept, c.salary AS c_salary + | FROM h2.test.employee b + | JOIN h2.test.employee c ON c.dept = b.dept - 1 + |) bc ON bc.dept = a.dept + 1 + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + assert(joinNodes.nonEmpty) + checkAnswer(df, rows) + } + } + + test("Test aggregate on top of 2-way self join") { + val sqlQuery = """ + |SELECT min(a.dept + b.dept), min(a.dept) + |FROM h2.test.employee a + |JOIN h2.test.employee b ON a.dept = b.dept + 1 + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + val aggNodes = df.queryExecution.optimizedPlan.collect { + case a: Aggregate => a + } + + assert(joinNodes.isEmpty) + assert(aggNodes.isEmpty) + checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]") + checkAnswer(df, rows) + } + } + + test("Test aggregate on top of multi-way self join") { + val sqlQuery = """ + |SELECT min(a.dept + b.dept), min(a.dept), min(c.dept - 2) + |FROM h2.test.employee a + |JOIN h2.test.employee b ON b.dept = a.dept + 1 + |JOIN h2.test.employee c ON c.dept = b.dept - 1 + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + val aggNodes = df.queryExecution.optimizedPlan.collect { + case a: Aggregate => a + } + + assert(joinNodes.isEmpty) + assert(aggNodes.isEmpty) + checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, h2.test.employee]") + checkAnswer(df, rows) + } + } + + test("Test sort limit on top of join is pushed down") { + val sqlQuery = """ + |SELECT min(a.dept + b.dept), a.dept, b.dept + |FROM h2.test.employee a + |JOIN h2.test.employee b ON b.dept = a.dept + 1 + |GROUP BY a.dept, b.dept + |ORDER BY a.dept + |LIMIT 1 + |""".stripMargin + + val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "false") { + sql(sqlQuery).collect().toSeq + } + + withSQLConf( + SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { + val df = sql(sqlQuery) + val joinNodes = df.queryExecution.optimizedPlan.collect { + case j: Join => j + } + + val sortNodes = df.queryExecution.optimizedPlan.collect { + case s: Sort => s + } + + val limitNodes = df.queryExecution.optimizedPlan.collect { + case l: GlobalLimit => l + } + + assert(joinNodes.isEmpty) + assert(sortNodes.isEmpty) + assert(limitNodes.isEmpty) + checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]") + checkAnswer(df, rows) + } + } + Review Comment: I created a separate suite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org