On Thu, Jun 22, 2017 at 7:51 PM, OBones <obo...@free.fr> wrote:

> Hello,
>
> I'm trying to extend Spark so that it can use our own binary format as a
> read-only source for pipeline based computations.
> I already have a java class that gives me enough elements to build a
> complete StructType with enough metadata (NominalAttribute for instance).
> It also gives me the row count for the file and methods to read any given
> cell, as it basically is a giant array of values stored on disk.
> In order for this to plug properly in the Spark framework, I looked at the
> CSV source code and thus created a DefaultSource class in my package, this
> way:
>
> class DefaultSource
>   extends RelationProvider
>   with DataSourceRegister {
>
>   override def shortName(): String = "binfile"
>
>   private def checkPath(parameters: Map[String, String]): String = {
>     parameters.getOrElse("path", sys.error("'path' must be specified for
> BinFile data."))
>   }
>   override def createRelation(
>                                sqlContext: SQLContext,
>                                parameters: Map[String, String]):
> BaseRelation = {
>     val path = checkPath(parameters)
>     BinFileRelation(Some(path))(sqlContext)
>   }
> }
>
> I also created the BinFileRelation like this:
>
> case class BinFileRelation /*protected[spark]*/ (
>     location: Option[String])(@transient val sqlContext: SQLContext)
>   extends BaseRelation with TableScan {
>
>   private val reader = new BinFileReader(location.getOrElse(""))
>   override val schema: StructType = {
>     // retrieve column infos from reader, transform it into a valid
> StructType with two columns,
>     // the first being the label, the second being the vector of features
>   }
>
>   override def buildScan: RDD[Row] = {
>     // I have no idea what to return here, so null for now.
>     null
>   }
> }
>
> So, as you see, I managed to create the required code to return a valid
> schema, and was also able to write unittests for it.
> I copied "protected[spark]" from the CSV implementation, but I commented
> it out because it prevents compilation from being successful and it does
> not seem to be required.
> And most importantly, I have no idea how to create a valid dataframe to be
> returned by buildScan so that the data that is stored on disk is not loaded
> all at once in memory (it may be very huge, like hundreds of millions of
> rows).
>


You are effectively building a datasource for Spark

You can subclass the RDD class and create your own RDD which will be
returned in buildScan above.
This RDD class must implement a compute() method which will return an
Iterator
The iterator.next() will then be invoked by Spark as it executes.

Look at how the Cassandra connector does it
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala#L354




> I read the documentation here: https://spark.apache.org/docs/
> 2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html
> It says "Concrete implementation should inherit from one of the descendant
> Scan classes" but I could not find those any of those descendant in the
> documentation nor in the source code.
>

The scan classes referred to here are these, in addition to the
CatalystScan at the bottom of the file

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L245-L277




> Looking further in the code for "BaseRelation" I found the JDBCRelation
> class that implements buildScan by calling JDBCRDD.scanTable so I went
> looking at this method which basically creates an instance of the private
> class named JDBCRDD as well.
> This class extends Row[InternalRow] so it looks to me as if I should to
> the same for my own use
> However, I'm not sure how to implement the compute method for a simple
> read as mentioned above.
>
> Any help would be greatly appreciated.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to