[ https://issues.apache.org/jira/browse/SPARK-25799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Max Kießling resolved SPARK-25799. ---------------------------------- Resolution: Fixed Fix Version/s: 2.4.0 > DataSourceApiV2 scan reuse does not respect options > --------------------------------------------------- > > Key: SPARK-25799 > URL: https://issues.apache.org/jira/browse/SPARK-25799 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0, 2.3.1, 2.3.2 > Reporter: Max Kießling > Priority: Major > Fix For: 2.4.0 > > > When creating a custom data source with the Data Source API V2 it seems that > the computation of possible scan reuses is broken when the same data source > is used but configured with different configuration options. In the case when > both scans produce the same schema (which is always the case for count > queries with column pruning enabled) the optimizer will reuse the scan > produced by on of the data source instance for both branches of the query. > This can lead to wrong results if the configuration option somehow influences > the returned data. > The behavior can be reproduced with the following example: > {code:scala} > import org.apache.spark.sql.sources.v2.reader._ > import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, > ReadSupport} > import org.apache.spark.sql.types.StructType > import org.apache.spark.sql.{Row, SparkSession} > import scala.tools.nsc.interpreter.JList > class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport { > class Reader(rowCount: Int) extends DataSourceReader > with SupportsPushDownRequiredColumns { > var requiredSchema = new StructType().add("i", "int").add("j", "int") > override def pruneColumns(requiredSchema: StructType): Unit = { > this.requiredSchema = requiredSchema > } > override def readSchema(): StructType = { > requiredSchema > } > override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = > { > val res = new java.util.ArrayList[DataReaderFactory[Row]] > res.add(new AdvancedDataReaderFactory(0, 5, requiredSchema)) > res.add(new AdvancedDataReaderFactory(5, rowCount, requiredSchema)) > res > } > } > override def createReader(options: DataSourceOptions): DataSourceReader = > new Reader(options.get("rows").orElse("10").toInt) > } > class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: > StructType) > extends DataReaderFactory[Row] with DataReader[Row] { > private var current = start - 1 > override def createDataReader(): DataReader[Row] = { > new AdvancedDataReaderFactory(start, end, requiredSchema) > } > override def close(): Unit = {} > override def next(): Boolean = { > current += 1 > current < end > } > override def get(): Row = { > val values = requiredSchema.map(_.name).map { > case "i" => current > case "j" => -current > } > Row.fromSeq(values) > } > } > object DataSourceTest extends App { > val spark = SparkSession.builder().master("local[*]").getOrCreate() > val cls = classOf[AdvancedDataSourceV2] > val with100 = spark.read.format(cls.getName).option("rows", 100).load() > val with10 = spark.read.format(cls.getName).option("rows", 10).load() > assert(with100.union(with10).count == 110) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org