beliefer commented on code in PR #50060: URL: https://github.com/apache/spark/pull/50060#discussion_r1976798658
########## connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala: ########## @@ -355,4 +356,16 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT parameters = Map("pos" -> "0", "type" -> "\"ARRAY<ARRAY<INT>>\"") ) } + + test("SPARK-51321: Postgres pushdown for RPAD expression on string column") { + val df = sql(s"SELECT * FROM $catalogName.employee WHERE rpad(name, 10, 'x') = 'xxxxxxxxxx'") + val pushedQuery = df.queryExecution.executedPlan.toString().toLowerCase(Locale.ROOT) Review Comment: Please use `checkFilterPushed` here. ########## connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala: ########## @@ -225,4 +226,44 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD // scalastyle:on df.collect() } + + test("SPARK-51321: SQLServer pushdown for RPAD expression on string column") { + // Query that applies RPAD on the "name" column, expecting a padded result of 10 characters. + val df = sql(s"SELECT * FROM $catalogName.employee WHERE rpad(name, 10, 'x') = 'xxxxxxxxxx'") + val pushedQuery = getExternalEngineQuery(df.queryExecution.executedPlan) + .toLowerCase(Locale.ROOT) + + // Check that the pushed-down query contains the expected function calls. + assert(pushedQuery.contains("left("), + s"Expected LEFT function call in the pushed-down query, but got: $pushedQuery") + assert(pushedQuery.contains("concat("), + s"Expected CONCAT function call in the pushed-down query, but got: $pushedQuery") + assert(pushedQuery.contains("replicate("), + s"Expected REPLICATE function call in the pushed-down query, but got: $pushedQuery") + + assert(!pushedQuery.contains("filterexec"), + s"Expected the filter to be pushed down, but found a Spark-side filter in: $pushedQuery") + + df.collect() + } + + test("SPARK-51321: SQLServer pushdown for LPAD expression on string column") { + // Query that applies LPAD on the "name" column, expecting a padded result of 10 characters. + val df = sql(s"SELECT * FROM $catalogName.employee WHERE lpad(name, 10, 'x') = 'xxxxxxxxxx'") + val pushedQuery = getExternalEngineQuery(df.queryExecution.executedPlan) Review Comment: Please use `checkFilterPushed` here. ########## sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala: ########## @@ -48,7 +48,10 @@ private case class PostgresDialect() private val supportedAggregateFunctions = Set("MAX", "MIN", "SUM", "COUNT", "AVG", "VAR_POP", "VAR_SAMP", "STDDEV_POP", "STDDEV_SAMP", "COVAR_POP", "COVAR_SAMP", "CORR", "REGR_INTERCEPT", "REGR_R2", "REGR_SLOPE", "REGR_SXY") - private val supportedFunctions = supportedAggregateFunctions + private val supportedStringFunctions = Set("RPAD") + private val supportedFunctions = + supportedAggregateFunctions ++ + supportedStringFunctions Review Comment: ```suggestion private val supportedFunctions = supportedAggregateFunctions ++ supportedStringFunctions ``` ########## connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala: ########## @@ -225,4 +226,44 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD // scalastyle:on df.collect() } + + test("SPARK-51321: SQLServer pushdown for RPAD expression on string column") { + // Query that applies RPAD on the "name" column, expecting a padded result of 10 characters. + val df = sql(s"SELECT * FROM $catalogName.employee WHERE rpad(name, 10, 'x') = 'xxxxxxxxxx'") + val pushedQuery = getExternalEngineQuery(df.queryExecution.executedPlan) Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala: ########## @@ -84,6 +84,17 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr case _ => super.dialectFunctionName(funcName) } + override def visitSQLFunction(funcName: String, inputs: Array[String]): String = + funcName match { + case "RPAD" => + val Array(str, len, pad) = inputs + s"LEFT(CONCAT($str, REPLICATE($pad, $len)), $len)" + case "LPAD" => + val Array(str, len, pad) = inputs + s"RIGHT(CONCAT(REPLICATE($pad, $len), $str), $len)" + case _ => super.visitSQLFunction(funcName, inputs) + } Review Comment: Are you sure it works well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org