PetarVasiljevic-DB commented on code in PR #50921:
URL: https://github.com/apache/spark/pull/50921#discussion_r2192715977


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,114 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+
+  /**
+   * Helper method to calculate StructType based on the 
SupportsPushDownJoin.ColumnWithAlias and
+   * the given schema.
+   *
+   * If ColumnWithAlias object has defined alias, new field with new name 
being equal to alias
+   * should be returned. Otherwise, original field is returned.
+   */
+  private def calculateJoinOutputSchema(
+      columnsWithAliases: Array[SupportsPushDownJoin.ColumnWithAlias],
+      schema: StructType): StructType = {
+    var newSchema = StructType(Seq())
+    columnsWithAliases.foreach { columnWithAlias =>
+      val colName = columnWithAlias.getColName
+      val alias = columnWithAlias.getAlias
+      val field = schema(colName)
+
+      val newName = if (alias == null) colName else alias
+      newSchema = newSchema.add(newName, field.dataType, field.nullable, 
field.metadata)
+    }
+
+    newSchema
+  }
+
+  override def pushDownJoin(
+      other: SupportsPushDownJoin,
+      joinType: JoinType,
+      leftSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      rightSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      condition: Predicate ): Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+    val otherJdbcScanBuilder = other.asInstanceOf[JDBCScanBuilder]
+
+    // Get left side and right side of join sql queries. These will be used as 
subqueries in final
+    // join query.
+    val sqlQuery = 
buildSQLQueryUsedInJoinPushDown(leftSideRequiredColumnWithAliases)
+    val otherSideSqlQuery = otherJdbcScanBuilder
+      .buildSQLQueryUsedInJoinPushDown(rightSideRequiredColumnWithAliases)
+
+    // requiredSchema will become the finalSchema of this JDBCScanBuilder
+    var requiredSchema = StructType(Seq())
+    requiredSchema = 
calculateJoinOutputSchema(leftSideRequiredColumnWithAliases, finalSchema)
+    requiredSchema = requiredSchema.merge(
+      calculateJoinOutputSchema(
+        rightSideRequiredColumnWithAliases,
+        otherJdbcScanBuilder.finalSchema
+      )
+    )
+
+    val joinOutputColumnsString =
+      requiredSchema.fields.map(f => 
dialect.quoteIdentifier(f.name)).mkString(",")
+
+    val joinTypeStringOption = joinType match {
+      case JoinType.INNER_JOIN => Some("INNER JOIN")
+      case _ => None
+    }
+
+    if (!joinTypeStringOption.isDefined) return false
+
+    val compiledCondition = dialect.compileExpression(condition)
+    if (!compiledCondition.isDefined) return false
+
+    val conditionString = compiledCondition.get
+
+    val joinQuery = s"""
+      |SELECT $joinOutputColumnsString FROM
+      |($sqlQuery) ${JoinPushdownAliasGenerator.getSubqueryQualifier}
+      |${joinTypeStringOption.get}
+      |($otherSideSqlQuery) ${JoinPushdownAliasGenerator.getSubqueryQualifier}
+      |ON $conditionString
+      |""".stripMargin
+
+    val newMap = jdbcOptions.parameters.originalMap +
+      (JDBCOptions.JDBC_QUERY_STRING -> joinQuery) - 
(JDBCOptions.JDBC_TABLE_NAME)

Review Comment:
   No, it's w/e



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to