Hi Thierry,

Your code does not work if @yh18190 wants a global counter. A RDD may have
more than one partition. For each partition, cnt will be reset to -1. You
can try the following code:

scala> val rdd = sc.parallelize( (1, 'a') :: (2, 'b') :: (3, 'c') :: (4,
'd') :: Nil)
rdd: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[3] at
parallelize at <console>:12

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala> val rdd2 = rdd.partitionBy(new HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, Char)] = ShuffledRDD[4] at partitionBy
at <console>:18

scala> var cnt = -1
cnt: Int = -1

scala> val rdd3 = rdd2.map(i => {cnt+=1;  (cnt,i)} )
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Char))] = MappedRDD[5] at map at
<console>:22

scala> rdd3.collect
res2: Array[(Int, (Int, Char))] = Array((0,(2,b)), (1,(4,d)), (0,(1,a)),
(1,(3,c)))

A proper solution is using "rdd.partitionBy(new HashPartitioner(1))" to
make sure there is only one partition. But that's not efficient for big
input.

Best Regards,
Shixiong Zhu


2014-04-02 11:10 GMT+08:00 Thierry Herrmann <thierry.herrm...@gmail.com>:

> I'm new to Spark, but isn't this a pure scala question ?
>
> The following seems to work with the spark shell:
>
> $ spark-shell
>
> scala> val rdd = sc.makeRDD(List(10,20,30))
> rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at
> <console>:12
>
> scala> var cnt = -1
> cnt: Int = -1
>
> scala> val rdd2 = rdd.map(i => {cnt+=1;  (cnt,i)} )
> rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[9] at map at
> <console>:16
>
> scala> rdd2.collect
> res8: Array[(Int, Int)] = Array((0,10), (1,20), (2,30))
>
> Thierry
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471p3614.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to