Hi Ted I changed def customable(partitioner: Partitioner): RDD[(K, V)] = self.withScope { to def customable(partitioner: Partitioner): MyRDD[K, V] = self.withScope {
after rebuilding the whole spark project(Since it takes long time, I didn't do as you told at first), it also works. Thnaks On Mon, Mar 28, 2016 at 11:01 AM, Tenghuan He <tenghua...@gmail.com> wrote: > Thanks very much Ted > > I added MyRDD.scala to the spark source code and rebuilt the whole spark > project, using myrdd.asInstanceOf[MyRDD] doesn't work. It seems that MyRDD > is not exposed to the spark-shell. > > Finally I write a seperate spark application and add the MyRDD.scala to > the project then the custom method can be called in the main function and > it works. > I misunderstand the usage of custom rdd, the custom rdd does not have to > be written to the spark project like UnionRDD, CogroupedRDD, and just add > it to your own project. > > On Mon, Mar 28, 2016 at 4:28 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> My interpretation is that variable myrdd is of type RDD to REPL, though >> it was an instance of MyRDD. >> >> Using asInstanceOf in spark-shell should allow you to call your custom >> method. >> >> Here is declaration of RDD: >> >> abstract class RDD[T: ClassTag]( >> >> You can extend RDD and include your custom logic in the subclass. >> >> On Sun, Mar 27, 2016 at 10:14 AM, Tenghuan He <tenghua...@gmail.com> >> wrote: >> >>> Thanks Ted, >>> >>> but I have a doubt that as the code above (line 4) in the spark-shell >>> shows myrdd is already a MyRDD, does that not make sense? >>> >>> 1 scala> val part = new org.apache.spark.HashPartitioner(10) >>> 2 scala> val baseRDD = sc.parallelize(1 to 100000).map(x => (x, >>> "hello")).partitionBy(part).cache() >>> 3 scala> val myrdd = baseRDD.customable(part) // here customable is a >>> method added to the abstract RDD to create MyRDD >>> *4 myrdd: org.apache.spark.rdd.RDD[(Int, String)] = MyRDD[3]* at >>> customable at >>> 5 <console>:28 >>> 6 scala> *myrdd.customMethod(bulk)* >>> *7 error: value customMethod is not a member of >>> org.apache.spark.rdd.RDD[(Int, String)]* >>> >>> On Mon, Mar 28, 2016 at 12:50 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> bq. def customable(partitioner: Partitioner): RDD[(K, V)] = >>>> self.withScope { >>>> >>>> In above, you declare return type as RDD. While you actually intended >>>> to declare MyRDD as the return type. >>>> Or, you can cast myrdd as MyRDD in spark-shell. >>>> >>>> BTW I don't think it is good practice to add custom method to base RDD. >>>> >>>> On Sun, Mar 27, 2016 at 9:44 AM, Tenghuan He <tenghua...@gmail.com> >>>> wrote: >>>> >>>>> Hi Ted, >>>>> >>>>> The codes are running in spark-shell >>>>> >>>>> scala> val part = new org.apache.spark.HashPartitioner(10) >>>>> scala> val baseRDD = sc.parallelize(1 to 100000).map(x => (x, >>>>> "hello")).partitionBy(part).cache() >>>>> scala> val myrdd = baseRDD.customable(part) // here customable is a >>>>> method added to the abstract RDD to create MyRDD >>>>> myrdd: org.apache.spark.rdd.RDD[(Int, String)] = MyRDD[3] at >>>>> customable at >>>>> <console>:28 >>>>> scala> *myrdd.customMethod(bulk)* >>>>> *error: value customMethod is not a member of >>>>> org.apache.spark.rdd.RDD[(Int, String)]* >>>>> >>>>> and the customable method in PairRDDFunctions.scala is >>>>> >>>>> def customable(partitioner: Partitioner): RDD[(K, V)] = >>>>> self.withScope { >>>>> new MyRDD[K, V](self, partitioner) >>>>> } >>>>> >>>>> Thanks:) >>>>> >>>>> On Mon, Mar 28, 2016 at 12:28 AM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> Can you show the full stack trace (or top 10 lines) and the snippet >>>>>> using your MyRDD ? >>>>>> >>>>>> Thanks >>>>>> >>>>>> On Sun, Mar 27, 2016 at 9:22 AM, Tenghuan He <tenghua...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi everyone, >>>>>>> >>>>>>> I am creating a custom RDD which extends RDD and add a custom >>>>>>> method, however the custom method cannot be found. >>>>>>> The custom RDD looks like the following: >>>>>>> >>>>>>> class MyRDD[K, V]( >>>>>>> var base: RDD[(K, V)], >>>>>>> part: Partitioner >>>>>>> ) extends RDD[(K, V)](base.context, Nil) { >>>>>>> >>>>>>> def *customMethod*(bulk: ArrayBuffer[(K, (V, Int))]): myRDD[K, V] >>>>>>> = { >>>>>>> // ... custom code here >>>>>>> } >>>>>>> >>>>>>> override def compute(split: Partition, context: TaskContext): >>>>>>> Iterator[(K, V)] = { >>>>>>> // ... custome code here >>>>>>> } >>>>>>> >>>>>>> override protected def getPartitions: Array[Partition] = { >>>>>>> // ... custom code here >>>>>>> } >>>>>>> >>>>>>> override protected def getDependencies: Seq[Dependency[_]] = { >>>>>>> // ... custom code here >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> In spark-shell, it turns out that the overrided methods works well, >>>>>>> but when calling myrdd.customMethod(bulk), it throws out: >>>>>>> <console>:33: error: value customMethod is not a member of >>>>>>> org.apache.spark.rdd.RDD[(In >>>>>>> t, String)] >>>>>>> >>>>>>> Can anyone tell why the custom method can not be found? >>>>>>> Or do I have to add the customMethod to the abstract RDD and then >>>>>>> override it in custom RDD? >>>>>>> >>>>>>> PS: spark-version: 1.5.1 >>>>>>> >>>>>>> Thanks & Best regards >>>>>>> >>>>>>> Tenghuan >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >