[How-To][SQL] Create a dataframe inside the TableScan.buildScan method of a relation

2017-06-22 Thread OBones

Hello,

I'm trying to extend Spark so that it can use our own binary format as a 
read-only source for pipeline based computations.
I already have a java class that gives me enough elements to build a 
complete StructType with enough metadata (NominalAttribute for instance).
It also gives me the row count for the file and methods to read any 
given cell, as it basically is a giant array of values stored on disk.
In order for this to plug properly in the Spark framework, I looked at 
the CSV source code and thus created a DefaultSource class in my 
package, this way:


class DefaultSource
  extends RelationProvider
  with DataSourceRegister {

  override def shortName(): String = "binfile"

  private def checkPath(parameters: Map[String, String]): String = {
parameters.getOrElse("path", sys.error("'path' must be specified 
for BinFile data."))

  }
  override def createRelation(
   sqlContext: SQLContext,
   parameters: Map[String, String]): 
BaseRelation = {

val path = checkPath(parameters)
BinFileRelation(Some(path))(sqlContext)
  }
}

I also created the BinFileRelation like this:

case class BinFileRelation /*protected[spark]*/ (
location: Option[String])(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan {

  private val reader = new BinFileReader(location.getOrElse(""))
  override val schema: StructType = {
// retrieve column infos from reader, transform it into a valid 
StructType with two columns,

// the first being the label, the second being the vector of features
  }

  override def buildScan: RDD[Row] = {
// I have no idea what to return here, so null for now.
null
  }
}

So, as you see, I managed to create the required code to return a valid 
schema, and was also able to write unittests for it.
I copied "protected[spark]" from the CSV implementation, but I commented 
it out because it prevents compilation from being successful and it does 
not seem to be required.
And most importantly, I have no idea how to create a valid dataframe to 
be returned by buildScan so that the data that is stored on disk is not 
loaded all at once in memory (it may be very huge, like hundreds of 
millions of rows).
I read the documentation here: 
https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html
It says "Concrete implementation should inherit from one of the 
descendant Scan classes" but I could not find those any of those 
descendant in the documentation nor in the source code.


Looking further in the code for "BaseRelation" I found the JDBCRelation 
class that implements buildScan by calling JDBCRDD.scanTable so I went 
looking at this method which basically creates an instance of the 
private class named JDBCRDD as well.
This class extends Row[InternalRow] so it looks to me as if I should to 
the same for my own use
However, I'm not sure how to implement the compute method for a simple 
read as mentioned above.


Any help would be greatly appreciated.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [How-To][SQL] Create a dataframe inside the TableScan.buildScan method of a relation

2017-06-27 Thread OBones

Sandeep Joshi wrote:


So, as you see, I managed to create the required code to return a
valid schema, and was also able to write unittests for it.
I copied "protected[spark]" from the CSV implementation, but I
commented it out because it prevents compilation from being
successful and it does not seem to be required.
And most importantly, I have no idea how to create a valid
dataframe to be returned by buildScan so that the data that is
stored on disk is not loaded all at once in memory (it may be very
huge, like hundreds of millions of rows).



You are effectively building a datasource for Spark

You can subclass the RDD class and create your own RDD which will be 
returned in buildScan above.
This RDD class must implement a compute() method which will return an 
Iterator

The iterator.next() will then be invoked by Spark as it executes.

Look at how the Cassandra connector does it
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala#L354
Ah yes, that makes sense. Somehow, I was fixating on creating a RDD[Row] 
instance instead of deriving my own class from RDD[Row].



I read the documentation here:

https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/sources/BaseRelation.html


It says "Concrete implementation should inherit from one of the
descendant Scan classes" but I could not find those any of those
descendant in the documentation nor in the source code.


The scan classes referred to here are these, in addition to the 
CatalystScan at the bottom of the file


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L245-L277

Great, it clarifies the situation.

With that in mind, I was able to create the complete set of classes and 
work with my custom format.

Thanks for your help.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



More efficient RDD.count() implementation

2017-07-12 Thread OBones

Hello,

As I have written my own data source, I also wrote a custom RDD[Row] 
implementation to provide getPartitions and compute overrides.
This works very well but doing some performance analysis, I see that for 
any given pipeline fit operation, a fair amount of time is spent in the 
RDD.count method.
Its default implementation in RDD.scala is to go through the entire 
iterator, which in my case is counter productive because I already know 
the number of rows there are in the RDD or any partition returned by 
getPartitions.
As an initial attempt, I declared the following in my custom RDD 
implementation:


  override def count(): Long = { reader.RowCount }

but this never gets called which upon further inspection makes perfect 
sense. Indeed the internal code creates RDDs for every partition it has 
to work on. And this is where I'm a bit stuck because I have no idea as 
to how to override this creation.


Here is a call stack for a GBTRegressor run, but it's quite similar for 
RandomForestRegressor or DecisionTreeRegressor.


org.apache.spark.rdd.RDD.count(RDD.scala:1158)
org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:116)
org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:105)
org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:125)
org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:291)
org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:49)
org.apache.spark.ml.regression.GBTRegressor.train(GBTRegressor.scala:154)
org.apache.spark.ml.regression.GBTRegressor.train(GBTRegressor.scala:58)
org.apache.spark.ml.Predictor.fit(Predictor.scala:96)

Any suggestion would be much appreciated.

Regards

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org