On Thu, Jun 22, 2017 at 7:51 PM, OBones <obo...@free.fr> wrote: > Hello, > > I'm trying to extend Spark so that it can use our own binary format as a > read-only source for pipeline based computations. > I already have a java class that gives me enough elements to build a > complete StructType with enough metadata (NominalAttribute for instance). > It also gives me the row count for the file and methods to read any given > cell, as it basically is a giant array of values stored on disk. > In order for this to plug properly in the Spark framework, I looked at the > CSV source code and thus created a DefaultSource class in my package, this > way: > > class DefaultSource > extends RelationProvider > with DataSourceRegister { > > override def shortName(): String = "binfile" > > private def checkPath(parameters: Map[String, String]): String = { > parameters.getOrElse("path", sys.error("'path' must be specified for > BinFile data.")) > } > override def createRelation( > sqlContext: SQLContext, > parameters: Map[String, String]): > BaseRelation = { > val path = checkPath(parameters) > BinFileRelation(Some(path))(sqlContext) > } > } > > I also created the BinFileRelation like this: > > case class BinFileRelation /*protected[spark]*/ ( > location: Option[String])(@transient val sqlContext: SQLContext) > extends BaseRelation with TableScan { > > private val reader = new BinFileReader(location.getOrElse("")) > override val schema: StructType = { > // retrieve column infos from reader, transform it into a valid > StructType with two columns, > // the first being the label, the second being the vector of features > } > > override def buildScan: RDD[Row] = { > // I have no idea what to return here, so null for now. > null > } > } > > So, as you see, I managed to create the required code to return a valid > schema, and was also able to write unittests for it. > I copied "protected[spark]" from the CSV implementation, but I commented > it out because it prevents compilation from being successful and it does > not seem to be required. > And most importantly, I have no idea how to create a valid dataframe to be > returned by buildScan so that the data that is stored on disk is not loaded > all at once in memory (it may be very huge, like hundreds of millions of > rows). >
You are effectively building a datasource for Spark You can subclass the RDD class and create your own RDD which will be returned in buildScan above. This RDD class must implement a compute() method which will return an Iterator The iterator.next() will then be invoked by Spark as it executes. Look at how the Cassandra connector does it https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala#L354 > I read the documentation here: https://spark.apache.org/docs/ > 2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html > It says "Concrete implementation should inherit from one of the descendant > Scan classes" but I could not find those any of those descendant in the > documentation nor in the source code. > The scan classes referred to here are these, in addition to the CatalystScan at the bottom of the file https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L245-L277 > Looking further in the code for "BaseRelation" I found the JDBCRelation > class that implements buildScan by calling JDBCRDD.scanTable so I went > looking at this method which basically creates an instance of the private > class named JDBCRDD as well. > This class extends Row[InternalRow] so it looks to me as if I should to > the same for my own use > However, I'm not sure how to implement the compute method for a simple > read as mentioned above. > > Any help would be greatly appreciated. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >