beliefer commented on code in PR #49961:
URL: https://github.com/apache/spark/pull/49961#discussion_r1985896731


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScanBuilder.scala:
##########
@@ -25,6 +27,40 @@ class PythonScanBuilder(
     ds: PythonDataSourceV2,
     shortName: String,
     outputSchema: StructType,
-    options: CaseInsensitiveStringMap) extends ScanBuilder {
-  override def build(): Scan = new PythonScan(ds, shortName, outputSchema, 
options)
+    options: CaseInsensitiveStringMap)
+    extends ScanBuilder
+    with SupportsPushDownFilters {
+  private var supportedFilters: Array[Filter] = Array.empty
+
+  private def metadata: Map[String, String] = {
+    Map(
+      "PushedFilters" -> supportedFilters.mkString("[", ", ", "]"),
+      "ReadSchema" -> outputSchema.simpleString
+    )
+  }
+
+  override def build(): Scan = new PythonScan(ds, shortName, outputSchema, 
options, metadata)
+
+  // Optionally called by DSv2 once to push down filters before the scan is 
built.
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    if (!SQLConf.get.pythonFilterPushDown) {
+      return filters
+    }
+
+    val dataSource = ds.getOrCreateDataSourceInPython(shortName, options, 
Some(outputSchema))
+    val result = ds.source.pushdownFiltersInPython(dataSource, outputSchema, 
filters)
+
+    // The Data Source instance state changes after pushdown to remember the 
reader instance
+    // created and the filters pushed down. So pushdownFiltersInPython returns 
a new pickled
+    // Data Source instance. We need to use that new instance for further 
operations.
+    ds.setDataSourceInPython(dataSource.copy(dataSource = result.dataSource))
+
+    // Partition the filters into supported and unsupported ones.
+    val isPushed = result.isFilterPushed.zip(filters)
+    supportedFilters = isPushed.collect { case (true, filter) => filter 
}.toArray
+    val unsupported = isPushed.collect { case (false, filter) => filter 
}.toArray
+    unsupported

Review Comment:
   I think we should keep all the filters even if part of them can be pushed.
   This will ensure the robust if the python data source works incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to