cloud-fan commented on code in PR #50921: URL: https://github.com/apache/spark/pull/50921#discussion_r2192155725
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ########## @@ -98,6 +107,138 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) } + def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp { + // Join can be attempted to be pushed down only if left and right side of join are + // compatible (same data source, for example). Also, another requirement is that if + // there are projections between Join and ScanBuilderHolder, these projections need to be + // AttributeReferences. We could probably support Alias as well, but this should be on + // TODO list. + // Alias can exist between Join and sHolder node because the query below is not valid: + // SELECT * FROM + // (SELECT * FROM tbl t1 JOIN tbl2 t2) p + // JOIN + // (SELECT * FROM tbl t3 JOIN tbl3 t4) q + // ON p.t1.col = q.t3.col (this is not possible) + // It's because there are 2 same tables in both sides of top level join and it not possible + // to fully qualified the column names in condition. Therefore, query should be rewritten so + // that each of the outputs of child joins are aliased, so there would be a projection + // with aliases between top level join and scanBuilderHolder (that has pushed child joins). + case node @ Join( + PhysicalOperation( + leftProjections, + Nil, + leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin) + ), + PhysicalOperation( + rightProjections, + Nil, + rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin) + ), + joinType, + condition, + _) if conf.dataSourceV2JoinPushdown && + // We do not support pushing down anything besides AttributeReference. + leftProjections.forall(_.isInstanceOf[AttributeReference]) && + rightProjections.forall(_.isInstanceOf[AttributeReference]) && + // Cross joins are not supported because they increase the amount of data. + condition.isDefined && + // Joins on top of sampled tables are not supported + leftHolder.pushedSample.isEmpty && + rightHolder.pushedSample.isEmpty && + lBuilder.isOtherSideCompatibleForJoin(rBuilder) => + + // projections' names are maybe not up to date if the joins have been previously pushed down. + // For this reason, we need to use pushedJoinOutputMap to get up to date names. + // + // Normalized projections are then converted to StructType. + def getRequiredSchema( + projections: Seq[NamedExpression], + sHolder: ScanBuilderHolder): StructType = { + val normalizedProjections = DataSourceStrategy.normalizeExprs( + projections, + sHolder.output.map { a => + sHolder.pushedJoinOutputMap.getOrElse(a, a).asInstanceOf[AttributeReference] + } + ).asInstanceOf[Seq[AttributeReference]] + + fromAttributes(normalizedProjections) + } + + def generateJoinOutputAlias(name: String): String = + s"${name}_${java.util.UUID.randomUUID().toString.take(5)}" + + val leftSideRequiredSchema = getRequiredSchema(leftProjections, leftHolder) + val rightSideRequiredSchema = getRequiredSchema(rightProjections, rightHolder) + + // Alias the duplicated columns from left side of the join. + val leftSideRequiredSchemaWithAliases = leftSideRequiredSchema.fields.map { field => + val aliasName = if (rightSideRequiredSchema.fieldNames.contains(field.name)) { + generateJoinOutputAlias(field.name) + } else { + null + } + + new SupportsPushDownJoin.ColumnWithAlias(field.name, aliasName) + } + + // Aliasing of duplicated columns in right side of the join is not needed because the + // the conflicts are resolved by aliasing the left side. + val rightSideRequiredSchemaWithAliases = rightSideRequiredSchema.fields.map { field => + new SupportsPushDownJoin.ColumnWithAlias(field.name, null) + } + + // Create the AttributeMap that holds (Attribute -> Attribute with up to date name) mapping. + val pushedJoinOutputMap = AttributeMap[Expression]( + node.output.asInstanceOf[Seq[AttributeReference]] + .zip(leftSideRequiredSchemaWithAliases ++ rightSideRequiredSchemaWithAliases) + .map{ case (attr, columnWithAlias) => + if (columnWithAlias.getAlias == null) { + (attr, attr) Review Comment: When we rewrite attributes, we do `pushedJoinOutputMap.getOrElse(a, a)`. I think we don't need to put these entries into the map, as the key and value are the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org