Hi Ted
I changed
def customable(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
to
def customable(partitioner: Partitioner): MyRDD[K, V] = self.withScope {

after rebuilding the whole spark project(Since it takes long time, I didn't
do as you told at first), it also works.
Thnaks

On Mon, Mar 28, 2016 at 11:01 AM, Tenghuan He <tenghua...@gmail.com> wrote:

> Thanks very much Ted
>
> I added MyRDD.scala to the spark source code and rebuilt the whole spark
> project, using myrdd.asInstanceOf[MyRDD] doesn't work. It seems that MyRDD
> is not exposed to the spark-shell.
>
> Finally I write a seperate spark application and add the MyRDD.scala to
> the project then the custom method can be called in the main function and
> it works.
> I misunderstand the usage of custom rdd, the custom rdd does not have to
> be written to the spark project like UnionRDD, CogroupedRDD, and just add
> it to your own project.
>
> On Mon, Mar 28, 2016 at 4:28 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> My interpretation is that variable myrdd is of type RDD to REPL, though
>> it was an instance of MyRDD.
>>
>> Using asInstanceOf in spark-shell should allow you to call your custom
>> method.
>>
>> Here is declaration of RDD:
>>
>> abstract class RDD[T: ClassTag](
>>
>> You can extend RDD and include your custom logic in the subclass.
>>
>> On Sun, Mar 27, 2016 at 10:14 AM, Tenghuan He <tenghua...@gmail.com>
>> wrote:
>>
>>> ​Thanks Ted,
>>>
>>> but I have a doubt that as the code ​above (line 4) in the spark-shell
>>> shows myrdd is already a MyRDD, does that not make sense?
>>>
>>> 1 scala> val part = new org.apache.spark.HashPartitioner(10)
>>> 2 scala> val baseRDD = sc.parallelize(1 to 100000).map(x => (x,
>>> "hello")).partitionBy(part).cache()
>>> 3 scala> val myrdd = baseRDD.customable(part)  // here customable is a
>>> method added to the abstract RDD to create MyRDD
>>> *4 myrdd: org.apache.spark.rdd.RDD[(Int, String)] = MyRDD[3]* at
>>> customable at
>>> 5 <console>:28
>>> 6 scala> *myrdd.customMethod(bulk)*
>>> *7 error: value customMethod is not a member of
>>> org.apache.spark.rdd.RDD[(Int, String)]*
>>>
>>> On Mon, Mar 28, 2016 at 12:50 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> bq.   def customable(partitioner: Partitioner): RDD[(K, V)] =
>>>> self.withScope {
>>>>
>>>> In above, you declare return type as RDD. While you actually intended
>>>> to declare MyRDD as the return type.
>>>> Or, you can cast myrdd as MyRDD in spark-shell.
>>>>
>>>> BTW I don't think it is good practice to add custom method to base RDD.
>>>>
>>>> On Sun, Mar 27, 2016 at 9:44 AM, Tenghuan He <tenghua...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ted,
>>>>>
>>>>> The codes are running in spark-shell
>>>>>
>>>>> scala> val part = new org.apache.spark.HashPartitioner(10)
>>>>> scala> val baseRDD = sc.parallelize(1 to 100000).map(x => (x,
>>>>> "hello")).partitionBy(part).cache()
>>>>> scala> val myrdd = baseRDD.customable(part)  // here customable is a
>>>>> method added to the abstract RDD to create MyRDD
>>>>> myrdd: org.apache.spark.rdd.RDD[(Int, String)] = MyRDD[3] at
>>>>> customable at
>>>>> <console>:28
>>>>> scala> *myrdd.customMethod(bulk)*
>>>>> *error: value customMethod is not a member of
>>>>> org.apache.spark.rdd.RDD[(Int, String)]*
>>>>>
>>>>> and the customable method in PairRDDFunctions.scala is
>>>>>
>>>>>   def customable(partitioner: Partitioner): RDD[(K, V)] =
>>>>> self.withScope {
>>>>>     new MyRDD[K, V](self, partitioner)
>>>>>   }
>>>>>
>>>>> Thanks:)
>>>>>
>>>>> On Mon, Mar 28, 2016 at 12:28 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> Can you show the full stack trace (or top 10 lines) and the snippet
>>>>>> using your MyRDD ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Sun, Mar 27, 2016 at 9:22 AM, Tenghuan He <tenghua...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> ​Hi everyone,
>>>>>>>
>>>>>>>     I am creating a custom RDD which extends RDD and add a custom
>>>>>>> method, however the custom method cannot be found.
>>>>>>>     The custom RDD looks like the following:
>>>>>>>
>>>>>>> class MyRDD[K, V](
>>>>>>>     var base: RDD[(K, V)],
>>>>>>>     part: Partitioner
>>>>>>>   ) extends RDD[(K, V)](base.context, Nil) {
>>>>>>>
>>>>>>>   def *customMethod*(bulk: ArrayBuffer[(K, (V, Int))]): myRDD[K, V]
>>>>>>> = {
>>>>>>>   // ... custom code here
>>>>>>>   }
>>>>>>>
>>>>>>>   override def compute(split: Partition, context: TaskContext):
>>>>>>> Iterator[(K, V)] = {
>>>>>>>   // ... custome code here
>>>>>>>   }
>>>>>>>
>>>>>>>   override protected def getPartitions: Array[Partition] = {
>>>>>>>   // ... custom code here
>>>>>>>   }
>>>>>>>
>>>>>>>   override protected def getDependencies: Seq[Dependency[_]] = {
>>>>>>>   // ... custom code here
>>>>>>>   }
>>>>>>> }​
>>>>>>>
>>>>>>> In spark-shell, it turns out that the overrided methods works well,
>>>>>>> but when calling myrdd.customMethod(bulk), it throws out:
>>>>>>> <console>:33: error: value customMethod is not a member of
>>>>>>> org.apache.spark.rdd.RDD[(In
>>>>>>> t, String)]
>>>>>>>
>>>>>>> Can anyone tell why the custom method can not be found?
>>>>>>> Or do I have to add the customMethod to the abstract RDD and then
>>>>>>> override it in custom RDD?
>>>>>>>
>>>>>>> PS: spark-version: 1.5.1
>>>>>>>
>>>>>>> Thanks & Best regards
>>>>>>>
>>>>>>> Tenghuan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to