urosstan-db commented on code in PR #50921: URL: https://github.com/apache/spark/pull/50921#discussion_r2195219122
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ########## @@ -98,6 +100,142 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) } + def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp { + // Join can be attempted to be pushed down only if left and right side of join are + // compatible (same data source, for example). Also, another requirement is that if + // there are projections between Join and ScanBuilderHolder, these projections need to be + // AttributeReferences. We could probably support Alias as well, but this should be on + // TODO list. + // Alias can exist between Join and sHolder node because the query below is not valid: + // SELECT * FROM + // (SELECT * FROM tbl t1 JOIN tbl2 t2) p + // JOIN + // (SELECT * FROM tbl t3 JOIN tbl3 t4) q + // ON p.t1.col = q.t3.col (this is not possible) + // It's because there are duplicated columns in both sides of top level join and it's not + // possible to fully qualified the column names in condition. Therefore, query should be + // rewritten so that each of the outputs of child joins are aliased, so there would be a + // projection with aliases between top level join and scanBuilderHolder (that has pushed + // child joins). + case node @ Join( + PhysicalOperation( + leftProjections, + Nil, + leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin) + ), + PhysicalOperation( + rightProjections, + Nil, + rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin) + ), + joinType, + condition, + _) if conf.dataSourceV2JoinPushdown && + // We do not support pushing down anything besides AttributeReference. + leftProjections.forall(_.isInstanceOf[AttributeReference]) && + rightProjections.forall(_.isInstanceOf[AttributeReference]) && + // Cross joins are not supported because they increase the amount of data. + condition.isDefined && + // Joins on top of sampled tables are not supported + leftHolder.pushedSample.isEmpty && + rightHolder.pushedSample.isEmpty && + lBuilder.isOtherSideCompatibleForJoin(rBuilder) => + val leftSideRequiredColumnNames = getRequiredColumnNames(leftProjections, leftHolder) + val rightSideRequiredColumnNames = getRequiredColumnNames(rightProjections, rightHolder) + + // Alias the duplicated columns from left side of the join. + val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map { name => + val aliasName = + if (leftSideRequiredColumnNames.count(_ == name) > 1 || Review Comment: If we decide to keep optional aliasing, it would be good to make map[name, count] to avoid N^2 complexity here. I suppose column limit is not huge, but I saw certain tables and queries with 1000 columns, which means 1M operations here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org