Thanks very much Ted

I added MyRDD.scala to the spark source code and rebuilt the whole spark
project, using myrdd.asInstanceOf[MyRDD] doesn't work. It seems that MyRDD
is not exposed to the spark-shell.

Finally I write a seperate spark application and add the MyRDD.scala to the
project then the custom method can be called in the main function and it
works.
I misunderstand the usage of custom rdd, the custom rdd does not have to be
written to the spark project like UnionRDD, CogroupedRDD, and just add it
to your own project.

On Mon, Mar 28, 2016 at 4:28 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> My interpretation is that variable myrdd is of type RDD to REPL, though
> it was an instance of MyRDD.
>
> Using asInstanceOf in spark-shell should allow you to call your custom
> method.
>
> Here is declaration of RDD:
>
> abstract class RDD[T: ClassTag](
>
> You can extend RDD and include your custom logic in the subclass.
>
> On Sun, Mar 27, 2016 at 10:14 AM, Tenghuan He <tenghua...@gmail.com>
> wrote:
>
>> ​Thanks Ted,
>>
>> but I have a doubt that as the code ​above (line 4) in the spark-shell
>> shows myrdd is already a MyRDD, does that not make sense?
>>
>> 1 scala> val part = new org.apache.spark.HashPartitioner(10)
>> 2 scala> val baseRDD = sc.parallelize(1 to 100000).map(x => (x,
>> "hello")).partitionBy(part).cache()
>> 3 scala> val myrdd = baseRDD.customable(part)  // here customable is a
>> method added to the abstract RDD to create MyRDD
>> *4 myrdd: org.apache.spark.rdd.RDD[(Int, String)] = MyRDD[3]* at
>> customable at
>> 5 <console>:28
>> 6 scala> *myrdd.customMethod(bulk)*
>> *7 error: value customMethod is not a member of
>> org.apache.spark.rdd.RDD[(Int, String)]*
>>
>> On Mon, Mar 28, 2016 at 12:50 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq.   def customable(partitioner: Partitioner): RDD[(K, V)] =
>>> self.withScope {
>>>
>>> In above, you declare return type as RDD. While you actually intended to
>>> declare MyRDD as the return type.
>>> Or, you can cast myrdd as MyRDD in spark-shell.
>>>
>>> BTW I don't think it is good practice to add custom method to base RDD.
>>>
>>> On Sun, Mar 27, 2016 at 9:44 AM, Tenghuan He <tenghua...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ted,
>>>>
>>>> The codes are running in spark-shell
>>>>
>>>> scala> val part = new org.apache.spark.HashPartitioner(10)
>>>> scala> val baseRDD = sc.parallelize(1 to 100000).map(x => (x,
>>>> "hello")).partitionBy(part).cache()
>>>> scala> val myrdd = baseRDD.customable(part)  // here customable is a
>>>> method added to the abstract RDD to create MyRDD
>>>> myrdd: org.apache.spark.rdd.RDD[(Int, String)] = MyRDD[3] at customable
>>>> at
>>>> <console>:28
>>>> scala> *myrdd.customMethod(bulk)*
>>>> *error: value customMethod is not a member of
>>>> org.apache.spark.rdd.RDD[(Int, String)]*
>>>>
>>>> and the customable method in PairRDDFunctions.scala is
>>>>
>>>>   def customable(partitioner: Partitioner): RDD[(K, V)] =
>>>> self.withScope {
>>>>     new MyRDD[K, V](self, partitioner)
>>>>   }
>>>>
>>>> Thanks:)
>>>>
>>>> On Mon, Mar 28, 2016 at 12:28 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Can you show the full stack trace (or top 10 lines) and the snippet
>>>>> using your MyRDD ?
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Sun, Mar 27, 2016 at 9:22 AM, Tenghuan He <tenghua...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> ​Hi everyone,
>>>>>>
>>>>>>     I am creating a custom RDD which extends RDD and add a custom
>>>>>> method, however the custom method cannot be found.
>>>>>>     The custom RDD looks like the following:
>>>>>>
>>>>>> class MyRDD[K, V](
>>>>>>     var base: RDD[(K, V)],
>>>>>>     part: Partitioner
>>>>>>   ) extends RDD[(K, V)](base.context, Nil) {
>>>>>>
>>>>>>   def *customMethod*(bulk: ArrayBuffer[(K, (V, Int))]): myRDD[K, V]
>>>>>> = {
>>>>>>   // ... custom code here
>>>>>>   }
>>>>>>
>>>>>>   override def compute(split: Partition, context: TaskContext):
>>>>>> Iterator[(K, V)] = {
>>>>>>   // ... custome code here
>>>>>>   }
>>>>>>
>>>>>>   override protected def getPartitions: Array[Partition] = {
>>>>>>   // ... custom code here
>>>>>>   }
>>>>>>
>>>>>>   override protected def getDependencies: Seq[Dependency[_]] = {
>>>>>>   // ... custom code here
>>>>>>   }
>>>>>> }​
>>>>>>
>>>>>> In spark-shell, it turns out that the overrided methods works well,
>>>>>> but when calling myrdd.customMethod(bulk), it throws out:
>>>>>> <console>:33: error: value customMethod is not a member of
>>>>>> org.apache.spark.rdd.RDD[(In
>>>>>> t, String)]
>>>>>>
>>>>>> Can anyone tell why the custom method can not be found?
>>>>>> Or do I have to add the customMethod to the abstract RDD and then
>>>>>> override it in custom RDD?
>>>>>>
>>>>>> PS: spark-version: 1.5.1
>>>>>>
>>>>>> Thanks & Best regards
>>>>>>
>>>>>> Tenghuan
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to