I think this is expected behavior, though not what I think is reasonable in the long term. To my knowledge, this is how the v1 sources behave, and v2 just reuses the same mechanism to instantiate sources and uses a new interface for v2 features.
I think that the right approach is to use catalogs, which I've proposed in #21306 <https://github.com/apache/spark/pull/21306>. A catalog would be loaded by reflection just once and then configured. After that, the same instance for a given Spark SQL session would be reused. Because the catalog instantiates table instances that expose read and write capabilities (ReadSupport, WriteSupport), it can choose how to manage the life-cycle of those tables and can also cache instances to control how table state changes after a table is loaded. (Iceberg does this to use a fixed snapshot for all reads until the table is written to or is garbage collected.) rb On Tue, Oct 9, 2018 at 8:30 PM Hyukjin Kwon <gurwls...@gmail.com> wrote: > I took a look for the codes. > > val source = classOf[MyDataSource].getCanonicalName > spark.read.format(source).load().collect() > > Looks indeed it calls twice. > > First all: Looks it creates it first to read the schema for a logical plan > > test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36) > test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35) > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155) > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:172) > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:204) > org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167) > > Second call: it creates another for its actual partitions in a physcal plan > > test.org.apache.spark.sql.sources.v2.MyDataSourceReader.<init>(MyDataSourceReader.java:36) > test.org.apache.spark.sql.sources.v2.MyDataSource.createReader(MyDataSource.java:35) > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:155) > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.newReader(DataSourceV2Relation.scala:61) > org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$.apply(DataSourceV2Strategy.scala:103) > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63) > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63) > scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) > scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) > scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > scala.collection.Iterator$class.foreach(Iterator.scala:891) > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334) > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) > org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) > scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) > scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) > org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) > org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72) > org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68) > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77) > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77) > org.apache.spark.sql.Dataset.withAction(Dataset.scala:3360) > org.apache.spark.sql.Dataset.collect(Dataset.scala:2783) > > > Skimming the API doc at DataSourceReader at branch-2.4, I haven’t found > the guarantee that the readers are created only once. If that’s documented > somewhere, we should fix it in 2.4.0. If not, I think it fine since both > calls are in driver side and it’s something able to work around for > instance static class or thread local in this case. > > Forwarding to dev mailing list in case that this is something we haven't > foreseen. > > 2018년 10월 9일 (화) 오후 9:39, Shubham Chaurasia <shubh.chaura...@gmail.com>님이 > 작성: > >> Alright, so it is a big project which uses a SQL store underneath. >> I extracted out the minimal code and made a smaller project out of it and >> still it is creating multiple instances. >> >> Here is my project: >> >> ├── my-datasource.iml >> ├── pom.xml >> ├── src >> │ ├── main >> │ │ ├── java >> │ │ │ └── com >> │ │ │ └── shubham >> │ │ │ ├── MyDataSource.java >> │ │ │ └── reader >> │ │ │ └── MyDataSourceReader.java >> >> >> MyDataSource.java >> ------------------------------------------------- >> >> package com.shubham; >> >> import com.shubham.reader.MyDataSourceReader; >> import org.apache.spark.sql.SaveMode; >> import org.apache.spark.sql.sources.v2.DataSourceOptions; >> import org.apache.spark.sql.sources.v2.DataSourceV2; >> import org.apache.spark.sql.sources.v2.ReadSupport; >> import org.apache.spark.sql.sources.v2.WriteSupport; >> import org.apache.spark.sql.sources.v2.reader.DataSourceReader; >> import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; >> import org.apache.spark.sql.types.StructType; >> >> import java.util.Optional; >> >> public class MyDataSource implements DataSourceV2, ReadSupport, WriteSupport >> { >> >> public DataSourceReader createReader(DataSourceOptions options) { >> System.out.println("MyDataSource.createReader: Going to create a new >> MyDataSourceReader"); >> return new MyDataSourceReader(options.asMap()); >> } >> >> public Optional<DataSourceWriter> createWriter(String writeUUID, >> StructType schema, SaveMode mode, DataSourceOptions options) { >> return Optional.empty(); >> } >> } >> >> >> MyDataSourceReader.java >> ------------------------------------------------- >> >> package com.shubham.reader; >> >> import org.apache.spark.sql.sources.v2.reader.DataSourceReader; >> import org.apache.spark.sql.sources.v2.reader.InputPartition; >> import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch; >> import org.apache.spark.sql.types.StructType; >> import org.apache.spark.sql.vectorized.ColumnarBatch; >> >> import java.util.ArrayList; >> import java.util.List; >> import java.util.Map; >> >> public class MyDataSourceReader implements DataSourceReader, >> SupportsScanColumnarBatch { >> >> private Map<String, String> options; >> private StructType schema; >> >> public MyDataSourceReader(Map<String, String> options) { >> System.out.println("MyDataSourceReader.MyDataSourceReader: >> Instantiated...." + this); >> this.options = options; >> } >> >> @Override >> public StructType readSchema() { >> this.schema = (new StructType()) >> .add("col1", "int") >> .add("col2", "string"); >> System.out.println("MyDataSourceReader.readSchema: " + this + " schema: >> " + this.schema); >> return this.schema; >> } >> >> @Override >> public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { >> System.out.println("MyDataSourceReader.planBatchInputPartitions: " + >> this + " schema: " + this.schema); >> return new ArrayList<>(); >> } >> } >> >> >> ---------------------------------------- >> spark-shell output >> ---------------------------------------- >> scala> spark.read.format("com.shubham.MyDataSource").option("query", >> "select * from some_table").load.show >> >> MyDataSource.createReader: Going to create a new MyDataSourceReader >> MyDataSourceReader.MyDataSourceReader: >> Instantiated....com.shubham.reader.MyDataSourceReader@69fa5536 >> MyDataSourceReader.readSchema: >> com.shubham.reader.MyDataSourceReader@69fa5536 schema: >> StructType(StructField(col1,IntegerType,true), >> StructField(col2,StringType,true)) >> MyDataSource.createReader: Going to create a new MyDataSourceReader >> MyDataSourceReader.MyDataSourceReader: >> Instantiated....com.shubham.reader.MyDataSourceReader@3095c449 >> MyDataSourceReader.planBatchInputPartitions: >> com.shubham.reader.MyDataSourceReader@3095c449 schema: null >> +----+----+ >> |col1|col2| >> +----+----+ >> +----+----+ >> >> >> Here 2 instances of reader, MyDataSourceReader@69fa5536 and >> MyDataSourceReader@3095c449 are being created. Consequently schema is >> null in MyDataSourceReader@3095c449. >> >> Am I not doing it the correct way? >> >> Thanks, >> Shubham >> >> On Tue, Oct 9, 2018 at 4:43 PM Mendelson, Assaf <assaf.mendel...@rsa.com> >> wrote: >> >>> I am using v2.4.0-RC2 >>> >>> >>> >>> The code as is wouldn’t run (e.g. planBatchInputPartitions returns >>> null). How are you calling it? >>> >>> >>> >>> When I do: >>> >>> Val df = spark.read.format(mypackage).load().show() >>> >>> I am getting a single creation, how are you creating the reader? >>> >>> >>> >>> Thanks, >>> >>> Assaf >>> >>> >>> >>> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] >>> *Sent:* Tuesday, October 9, 2018 2:02 PM >>> *To:* Mendelson, Assaf; u...@spark.apache.org >>> *Subject:* Re: DataSourceV2 APIs creating multiple instances of >>> DataSourceReader and hence not preserving the state >>> >>> >>> >>> [EXTERNAL EMAIL] >>> Please report any suspicious attachments, links, or requests for >>> sensitive information. >>> >>> Thanks Assaf, you tried with *tags/v2.4.0-rc2?* >>> >>> >>> >>> Full Code: >>> >>> >>> >>> MyDataSource is the entry point which simply creates Reader and Writer >>> >>> >>> >>> public class MyDataSource implements DataSourceV2, WriteSupport, >>> ReadSupport, SessionConfigSupport { >>> >>> >>> >>> @Override public DataSourceReader createReader(DataSourceOptions >>> options) { >>> >>> return new MyDataSourceReader(options.asMap()); >>> >>> } >>> >>> >>> >>> @Override >>> >>> public Optional<DataSourceWriter> createWriter(String jobId, >>> StructType schema, >>> >>> SaveMode mode, DataSourceOptions options) { >>> >>> // creates a dataSourcewriter here.. >>> >>> return Optional.of(dataSourcewriter); >>> >>> } >>> >>> >>> >>> @Override public String keyPrefix() { >>> >>> return "myprefix"; >>> >>> } >>> >>> >>> >>> } >>> >>> >>> >>> public class MyDataSourceReader implements DataSourceReader, >>> SupportsScanColumnarBatch { >>> >>> >>> >>> StructType schema = null; >>> >>> Map<String, String> options; >>> >>> >>> >>> public MyDataSourceReader(Map<String, String> options) { >>> >>> System.out.println("MyDataSourceReader.MyDataSourceReader: >>> Instantiated...." + this); >>> >>> this.options = options; >>> >>> } >>> >>> >>> >>> @Override >>> >>> public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { >>> >>> //variable this.schema is null here since readSchema() was called on >>> a different instance >>> >>> System.out.println("MyDataSourceReader.planBatchInputPartitions: " + >>> this + " schema: " + this.schema); >>> >>> //more logic...... >>> >>> return null; >>> >>> } >>> >>> >>> >>> @Override >>> >>> public StructType readSchema() { >>> >>> //some logic to discover schema >>> >>> this.schema = (new StructType()) >>> >>> .add("col1", "int") >>> >>> .add("col2", "string"); >>> >>> System.out.println("MyDataSourceReader.readSchema: " + this + " >>> schema: " + this.schema); >>> >>> return this.schema; >>> >>> } >>> >>> } >>> >>> >>> >>> Thanks, >>> >>> Shubham >>> >>> >>> >>> On Tue, Oct 9, 2018 at 3:59 PM Mendelson, Assaf <assaf.mendel...@rsa.com> >>> wrote: >>> >>> Could you add a fuller code example? I tried to reproduce it in my >>> environment and I am getting just one instance of the reader… >>> >>> >>> >>> Thanks, >>> >>> Assaf >>> >>> >>> >>> *From:* Shubham Chaurasia [mailto:shubh.chaura...@gmail.com] >>> *Sent:* Tuesday, October 9, 2018 9:31 AM >>> *To:* u...@spark.apache.org >>> *Subject:* DataSourceV2 APIs creating multiple instances of >>> DataSourceReader and hence not preserving the state >>> >>> >>> >>> [EXTERNAL EMAIL] >>> Please report any suspicious attachments, links, or requests for >>> sensitive information. >>> >>> Hi All, >>> >>> >>> >>> --Spark built with *tags/v2.4.0-rc2* >>> >>> >>> >>> Consider following DataSourceReader implementation: >>> >>> >>> >>> *public class *MyDataSourceReader *implements *DataSourceReader, >>> SupportsScanColumnarBatch { >>> >>> StructType *schema *= *null*; >>> Map<String, String> *options*; >>> >>> *public *MyDataSourceReader(Map<String, String> options) { >>> System.*out*.println(*"MyDataSourceReader.MyDataSourceReader: >>> Instantiated...." *+ *this*); >>> *this*.*options *= options; >>> } >>> >>> @Override >>> *public *List<InputPartition<ColumnarBatch>> planBatchInputPartitions() { >>> >>> *//variable this.schema is null here since readSchema() was called on a >>> different instance >>> *System.*out*.println(*"MyDataSourceReader.planBatchInputPartitions: " *+ >>> *this *+ *" schema: " *+ *this*.*schema*); >>> >>> *//more logic...... **return null*; >>> } >>> >>> @Override >>> *public *StructType readSchema() { >>> >>> *//some logic to discover schema **this*.*schema *= (*new *StructType()) >>> .add(*"col1"*, *"int"*) >>> .add(*"col2"*, *"string"*); >>> System.*out*.println(*"MyDataSourceReader.readSchema: " *+ *this *+ *" >>> schema: " *+ *this*.*schema*); >>> *return this*.*schema*; >>> } >>> } >>> >>> 1) First readSchema() is called on MyDataSourceReader@instance1 which sets >>> class variable schema. >>> >>> 2) Now when planBatchInputPartitions() is called, it is being called on a >>> different instance of MyDataSourceReader and hence I am not getting the >>> value of schema in method planBatchInputPartitions(). >>> >>> >>> >>> How can I get value of schema which was set in readSchema() method, in >>> planBatchInputPartitions() method? >>> >>> >>> >>> Console Logs: >>> >>> >>> >>> scala> mysource.executeQuery("select * from movie").show >>> >>> >>> >>> MyDataSourceReader.MyDataSourceReader: >>> Instantiated....MyDataSourceReader@59ea8f1b >>> >>> MyDataSourceReader.readSchema: MyDataSourceReader@59ea8f1b schema: >>> StructType(StructField(col1,IntegerType,true), >>> StructField(col2,StringType,true)) >>> >>> MyDataSourceReader.MyDataSourceReader: >>> Instantiated....MyDataSourceReader@a3cd3ff >>> >>> MyDataSourceReader.planBatchInputPartitions: MyDataSourceReader@a3cd3ff >>> schema: null >>> >>> >>> >>> Thanks, >>> >>> Shubham >>> >>> >>> >>> -- Ryan Blue Software Engineer Netflix