beliefer commented on code in PR #50060:
URL: https://github.com/apache/spark/pull/50060#discussion_r1976798658


##########
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala:
##########
@@ -355,4 +356,16 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCT
       parameters = Map("pos" -> "0", "type" -> "\"ARRAY<ARRAY<INT>>\"")
     )
   }
+
+  test("SPARK-51321: Postgres pushdown for RPAD expression on string column") {
+    val df = sql(s"SELECT * FROM $catalogName.employee WHERE rpad(name, 10, 
'x') = 'xxxxxxxxxx'")
+    val pushedQuery = 
df.queryExecution.executedPlan.toString().toLowerCase(Locale.ROOT)

Review Comment:
   Please use `checkFilterPushed` here.



##########
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala:
##########
@@ -225,4 +226,44 @@ class MsSqlServerIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JD
     // scalastyle:on
     df.collect()
   }
+
+  test("SPARK-51321: SQLServer pushdown for RPAD expression on string column") 
{
+    // Query that applies RPAD on the "name" column, expecting a padded result 
of 10 characters.
+    val df = sql(s"SELECT * FROM $catalogName.employee WHERE rpad(name, 10, 
'x') = 'xxxxxxxxxx'")
+    val pushedQuery = getExternalEngineQuery(df.queryExecution.executedPlan)
+      .toLowerCase(Locale.ROOT)
+
+    // Check that the pushed-down query contains the expected function calls.
+    assert(pushedQuery.contains("left("),
+      s"Expected LEFT function call in the pushed-down query, but got: 
$pushedQuery")
+    assert(pushedQuery.contains("concat("),
+      s"Expected CONCAT function call in the pushed-down query, but got: 
$pushedQuery")
+    assert(pushedQuery.contains("replicate("),
+      s"Expected REPLICATE function call in the pushed-down query, but got: 
$pushedQuery")
+
+    assert(!pushedQuery.contains("filterexec"),
+      s"Expected the filter to be pushed down, but found a Spark-side filter 
in: $pushedQuery")
+
+    df.collect()
+  }
+
+  test("SPARK-51321: SQLServer pushdown for LPAD expression on string column") 
{
+    // Query that applies LPAD on the "name" column, expecting a padded result 
of 10 characters.
+    val df = sql(s"SELECT * FROM $catalogName.employee WHERE lpad(name, 10, 
'x') = 'xxxxxxxxxx'")
+    val pushedQuery = getExternalEngineQuery(df.queryExecution.executedPlan)

Review Comment:
   Please use `checkFilterPushed` here.



##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala:
##########
@@ -48,7 +48,10 @@ private case class PostgresDialect()
   private val supportedAggregateFunctions = Set("MAX", "MIN", "SUM", "COUNT", 
"AVG",
     "VAR_POP", "VAR_SAMP", "STDDEV_POP", "STDDEV_SAMP", "COVAR_POP", 
"COVAR_SAMP", "CORR",
     "REGR_INTERCEPT", "REGR_R2", "REGR_SLOPE", "REGR_SXY")
-  private val supportedFunctions = supportedAggregateFunctions
+  private val supportedStringFunctions = Set("RPAD")
+  private val supportedFunctions =
+    supportedAggregateFunctions ++
+      supportedStringFunctions

Review Comment:
   ```suggestion
     private val supportedFunctions = supportedAggregateFunctions ++ 
supportedStringFunctions
   ```



##########
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala:
##########
@@ -225,4 +226,44 @@ class MsSqlServerIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JD
     // scalastyle:on
     df.collect()
   }
+
+  test("SPARK-51321: SQLServer pushdown for RPAD expression on string column") 
{
+    // Query that applies RPAD on the "name" column, expecting a padded result 
of 10 characters.
+    val df = sql(s"SELECT * FROM $catalogName.employee WHERE rpad(name, 10, 
'x') = 'xxxxxxxxxx'")
+    val pushedQuery = getExternalEngineQuery(df.queryExecution.executedPlan)

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala:
##########
@@ -84,6 +84,17 @@ private case class MsSqlServerDialect() extends JdbcDialect 
with NoLegacyJDBCErr
       case _ => super.dialectFunctionName(funcName)
     }
 
+    override def visitSQLFunction(funcName: String, inputs: Array[String]): 
String =
+      funcName match {
+        case "RPAD" =>
+          val Array(str, len, pad) = inputs
+          s"LEFT(CONCAT($str, REPLICATE($pad, $len)), $len)"
+        case "LPAD" =>
+          val Array(str, len, pad) = inputs
+          s"RIGHT(CONCAT(REPLICATE($pad, $len), $str), $len)"
+        case _ => super.visitSQLFunction(funcName, inputs)
+      }

Review Comment:
   Are you sure it works well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to