Just remove `val sparkConf = sc.getConf`, `RDD.sparkContext` is already
defined

On Thu, Mar 1, 2018 at 3:13 AM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Wenchen,
>
>
>
> Thank you very much for your prompt reply and pointer!
>
>
>
> As I think through, it makes sense that since my custom RDD is
> instantiated on the driver, get whatever things I need from the
> SparkContext and assign them to instance variables.
>
>
>
> However the "RDD.SparkContext" and the Scala magic of class variables did
> not work.
>
>
>
> Here's what worked based you on your tip:
>
>
>
>
> *class *MyDataSourceRDD(sc: SparkContext, conf: Map[String, String]) *extends
> *RDD[Row](sc, *Nil*) {
>
>   *val **sparkConf *= sc.getConf
>
>   *override def *getPartitions: Array[org.apache.spark.Partition] = {
>     *sparkConf*.getAll.foreach(*println*)
>     *val *numPartitions = conf.getOrElse(*"spark.mydata.numpartitions"*,
> *"1"*).toInt
>     *val *rowsPerPartition = conf.getOrElse(
> *"spark.mydata.rowsperpartition"*, *"3"*).toInt
>     *val *partitions = 0 until numPartitions map(partition => *new *
> MyDataSourcePartition(partition,rowsPerPartition))
>     partitions.toArray
>   }
>
>   *override def *compute(split: Partition, context: TaskContext):
> Iterator[Row] = {
>     *val *myDataSourcePartition = split.asInstanceOf[
> MyDataSourcePartition]
>     *val *partitionId = myDataSourcePartition.index
>     *val *rows = myDataSourcePartition.rowCount
>     *val *partitionData = 1 to rows map(r => *Row*(*s"Partition: $*
> {partitionId}*, row $*{r}* of $*{rows}*"*))
>     partitionData.iterator
>   }
>
> }
>
>
>
>
>
> *From: *Wenchen Fan <cloud0...@gmail.com>
> *Date: *Wednesday, February 28, 2018 at 12:25 PM
> *To: *"Thakrar, Jayesh" <jthak...@conversantmedia.com>
> *Cc: *"dev@spark.apache.org" <dev@spark.apache.org>
> *Subject: *Re: SparkContext - parameter for RDD, but not serializable,
> why?
>
>
>
> My understanding:
>
>
>
> RDD is also a driver side stuff like SparkContext, works like a handler to
> your distributed data on the cluster.
>
>
>
> However, `RDD.compute` (defines how to produce data for each partition)
> needs to be executed on the remote nodes. It's more convenient to make RDD
> serializable, and transfer RDD object to remote nodes and call `compute`.
>
>
>
> So you can use SparkContext inside RDD, as long as you don't use it in
> methods that are going to be executed on remote nodes, like `RDD.compute`.
> And you should call `RDD.sparkContext` instead of using `sc` directly,
> because that will turn `sc` from a constructor parameter to a class member
> variable(kind of a hacky part of Scala), which will be serialized.
>
>
>
> On Thu, Mar 1, 2018 at 2:03 AM, Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> Hi All,
>
>
>
> I was just toying with creating a very rudimentary RDD datasource to
> understand the inner workings of RDDs.
>
>
>
> It seems that one of the constructors for RDD has a parameter of type
> SparkContext, but it (apparently) exists on the driver only and is not
> serializable.
>
>
>
> Consequently, any attempt to use SparkContext parameter inside your custom
> RDD generates a runtime error of it not being serializable.
>
>
>
> Just wondering what is the rationale behind this?
>
> I.e. if it is not serializable/usable, why make it a parameter?
>
> And if it needs to be a parameter, why not make it serializable (is it
> even possible?)
>
>
>
> Below is my working code where I test a custom RDD.
>
>
>
> scala> val mydata = spark.read.format("MyDataSourceProvider").load()
>
> mydata: org.apache.spark.sql.DataFrame = [mydataStr: string]
>
>
>
> scala> mydata.show(10, false)
>
> +------------------------+
>
> |mydataStr               |
>
> +------------------------+
>
> |Partition: 0, row 1 of 3|
>
> |Partition: 0, row 2 of 3|
>
> |Partition: 0, row 3 of 3|
>
> +------------------------+
>
>
>
> scala>
>
>
>
>
>
> ///// custom RDD
>
>
>
>
> *import *org.apache.spark.internal.Logging
> *import *org.apache.spark.sql.{Row, SQLContext}
> *import *org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister,
> RelationProvider, TableScan}
> *import *org.apache.spark.sql.types.{StringType, StructField, StructType}
> *import *org.apache.spark.{Partition, SparkContext, TaskContext}
> *import *org.apache.spark.rdd.RDD
>
>
> *class *MyDataSourceProvider *extends *DataSourceRegister
>   *with *RelationProvider *with *Logging {
>
>   *override def *shortName():String = { *"mydata" *}
>
>   *private val **myDataSchema*: StructType = *new *StructType(*Array*
> [StructField](*new *StructField(*"mydataStr"*, StringType, *false*)))
>
>   *def *sourceSchema(sqlContext: SQLContext,
>                             schema: Option[StructType],
>                             providerName: String,
>                             parameters: Map[String, String]): (String,
> StructType) = {
>     (shortName(), schema.get)
>   }
>
>   *override def *createRelation(sqlContext: SQLContext,
>                               parameters: Map[String, String]):
> BaseRelation = {
>     *new *MyDataRelation(sqlContext, *myDataSchema*, parameters)
>   }
>
> }
>
>
> *class *MyDataRelation(*override val *sqlContext: SQLContext,
>                      *override val *schema: StructType,
>                      params: Map[String, String]) *extends *BaseRelation *
> with *TableScan *with *Logging {
>
>   *override def *buildScan(): org.apache.spark.rdd.RDD[Row] = {
>     *val *rdd = *new *MyDataSourceRDD(sqlContext.sparkContext,
> sqlContext.getAllConfs)
>     rdd
>   }
>
>   *override def *needConversion =
> *true *}
>
>
> *class *MyDataSourceRDD(sc: SparkContext, conf: Map[String, String]) *extends
> *RDD[Row](sc, *Nil*) {
>
>   *override def *getPartitions: Array[org.apache.spark.Partition] = {
>
> *// sc.getConf.getAll.foreach(println) - this fails with SparkContext not
> serialiable error. So what use is this parameter ?!     **val *numPartitions
> = conf.getOrElse(*"spark.mydata.numpartitions"*, *"1"*).toInt
>     *val *rowsPerPartition = conf.getOrElse(
> *"spark.mydata.rowsperpartition"*, *"3"*).toInt
>     *val *partitions = 0 until numPartitions map(partition => *new *
> MyDataSourcePartition(partition,rowsPerPartition))
>     partitions.toArray
>   }
>
>   *override def *compute(split: Partition, context: TaskContext):
> Iterator[Row] = {
>     *val *myDataSourcePartition = split.asInstanceOf[
> MyDataSourcePartition]
>     *val *partitionId = myDataSourcePartition.index
>     *val *rows = myDataSourcePartition.rowCount
>     *val *partitionData = 1 to rows map(r => *Row*(*s"Partition: $*
> {partitionId}*, row $*{r}* of $*{rows}*"*))
>     partitionData.iterator
>   }
>
> }
>
>
> *class *MyDataSourcePartition(partitionId: Int, rows: Int) *extends *Partition
> *with *Serializable {
>
>   *override def *index:Int = partitionId
>
>   *def *rowCount: Int = rows
>
>
>
>
>

Reply via email to