wengh commented on code in PR #49961:
URL: https://github.com/apache/spark/pull/49961#discussion_r1985529476


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScanBuilder.scala:
##########
@@ -25,6 +27,40 @@ class PythonScanBuilder(
     ds: PythonDataSourceV2,
     shortName: String,
     outputSchema: StructType,
-    options: CaseInsensitiveStringMap) extends ScanBuilder {
-  override def build(): Scan = new PythonScan(ds, shortName, outputSchema, 
options)
+    options: CaseInsensitiveStringMap)
+    extends ScanBuilder
+    with SupportsPushDownFilters {
+  private var supportedFilters: Array[Filter] = Array.empty
+
+  private def metadata: Map[String, String] = {
+    Map(
+      "PushedFilters" -> supportedFilters.mkString("[", ", ", "]"),
+      "ReadSchema" -> outputSchema.simpleString
+    )
+  }
+
+  override def build(): Scan = new PythonScan(ds, shortName, outputSchema, 
options, metadata)
+
+  // Optionally called by DSv2 once to push down filters before the scan is 
built.
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    if (!SQLConf.get.pythonFilterPushDown) {

Review Comment:
   We'd like to avoid the new code path (serializing filters, running new 
python worker, ...) for existing Python data sources that don't implement 
pushdown. So in case there's a crash or a performance issue in the new code 
path, its impact is limited.
   
   However we currently don't have a good way to detect whether user has 
implemented `pushFilters()` in Python `DataSourceReader` before 
`ScanBuilder.pushFilters()` is called. This is because we don't know whether 
it's a streaming read or a batch read at this point (the optimizer knows but 
the data source doesn't get this info) so it's not safe to call Python 
`DataSource.reader()` to get the batch reader instance.
   
   So we instead add a conf to turn off the new code path. But also if the user 
imeplements `pushFilters()` and this conf is disabled then we throw an error to 
let the user know that they must turn on the conf to enable filter pushdown.
   
   In the future if we figure out how to check whether the Python reader 
imeplements `pushFilters` then we can set this conf to enabled by default and 
deprecate it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to