beliefer commented on code in PR #49961: URL: https://github.com/apache/spark/pull/49961#discussion_r1984797430
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScanBuilder.scala: ########## @@ -25,6 +27,40 @@ class PythonScanBuilder( ds: PythonDataSourceV2, shortName: String, outputSchema: StructType, - options: CaseInsensitiveStringMap) extends ScanBuilder { - override def build(): Scan = new PythonScan(ds, shortName, outputSchema, options) + options: CaseInsensitiveStringMap) + extends ScanBuilder + with SupportsPushDownFilters { + private var supportedFilters: Array[Filter] = Array.empty + + private def metadata: Map[String, String] = { + Map( + "PushedFilters" -> supportedFilters.mkString("[", ", ", "]"), + "ReadSchema" -> outputSchema.simpleString + ) + } + + override def build(): Scan = new PythonScan(ds, shortName, outputSchema, options, metadata) + + // Optionally called by DSv2 once to push down filters before the scan is built. + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + if (!SQLConf.get.pythonFilterPushDown) { + return filters + } + + val dataSource = ds.getOrCreateDataSourceInPython(shortName, options, Some(outputSchema)) + val result = ds.source.pushdownFiltersInPython(dataSource, outputSchema, filters) + + // The Data Source instance state changes after pushdown to remember the reader instance + // created and the filters pushed down. So pushdownFiltersInPython returns a new pickled + // Data Source instance. We need to use that new instance for further operations. + ds.setDataSourceInPython(dataSource.copy(dataSource = result.dataSource)) + + // Partition the filters into supported and unsupported ones. + val isPushed = result.isFilterPushed.zip(filters) + supportedFilters = isPushed.collect { case (true, filter) => filter }.toArray + val unsupported = isPushed.collect { case (false, filter) => filter }.toArray + unsupported Review Comment: Do we need keep all the filters, it will be more robust. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScanBuilder.scala: ########## @@ -25,6 +27,40 @@ class PythonScanBuilder( ds: PythonDataSourceV2, shortName: String, outputSchema: StructType, - options: CaseInsensitiveStringMap) extends ScanBuilder { - override def build(): Scan = new PythonScan(ds, shortName, outputSchema, options) + options: CaseInsensitiveStringMap) + extends ScanBuilder + with SupportsPushDownFilters { + private var supportedFilters: Array[Filter] = Array.empty + + private def metadata: Map[String, String] = { + Map( + "PushedFilters" -> supportedFilters.mkString("[", ", ", "]"), + "ReadSchema" -> outputSchema.simpleString + ) + } + + override def build(): Scan = new PythonScan(ds, shortName, outputSchema, options, metadata) + + // Optionally called by DSv2 once to push down filters before the scan is built. + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + if (!SQLConf.get.pythonFilterPushDown) { + return filters + } + + val dataSource = ds.getOrCreateDataSourceInPython(shortName, options, Some(outputSchema)) + val result = ds.source.pushdownFiltersInPython(dataSource, outputSchema, filters) + + // The Data Source instance state changes after pushdown to remember the reader instance + // created and the filters pushed down. So pushdownFiltersInPython returns a new pickled + // Data Source instance. We need to use that new instance for further operations. + ds.setDataSourceInPython(dataSource.copy(dataSource = result.dataSource)) + + // Partition the filters into supported and unsupported ones. + val isPushed = result.isFilterPushed.zip(filters) + supportedFilters = isPushed.collect { case (true, filter) => filter }.toArray + val unsupported = isPushed.collect { case (false, filter) => filter }.toArray + unsupported Review Comment: Do we need keep all the filters? it will be more robust. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org