Hi Attila,

Thanks for looking into this!

I actually found the issue and it turned out to be that the print
statements misled me. The records are indeed stored in different partitions.
What happened is since the foreachpartition method is run parallelly by
different threads, they all printed the first line almost at the same time
and followed by data which is also printed at almost the same time. This
has given an appearance that all the data is stored in a single partition.
When I run the below code, I can see that the objects are stored in
different partitions of course!

*myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
println("Index : " +index +" " + e)); itr}, true).collect()*

Prints the below... (index: ?  the ? is actually the partition number)
*Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *

Thanks!

On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

> Hi!
>
> This is weird. The code of foreachPartition
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>  leads
> to ParallelCollectionRDD
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>  which
> ends in slice
> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
> where the most important part is the *positions* method:
>
>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>      (0 until numSlices).iterator.map { i =>
>         val start = ((i * length) / numSlices).toInt
>         val end = (((i + 1) * length) / numSlices).toInt
>         (start, end)
>      }
>  }
>
> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
> some scala implicit might generate a Seq with one Array in it.
> But in that case your output would contain an Array. So this must be not
> the case.
>
> 1) What Spark/Scala version you are using? on what OS?
>
> 2)  Can you reproduce this issue in the spark-shell?
>
> scala> case class Animal(id:Int, name:String)
> defined class Animal
>
> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
> Tiger"), Animal(5, "Chetah") ) ), 12)
> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
> parallelize at <console>:27
>
> scala> myRDD.foreachPartition( e => { println("----------");
> e.foreach(println) } )
> ----------
> ----------
> ----------
> Animal(1,Lion)
> ----------
> ----------
> Animal(2,Elephant)
> ----------
> ----------
> ----------
> Animal(3,Jaguar)
> ----------
> ----------
> Animal(4,Tiger)
> ----------
> ----------
> Animal(5,Chetah)
>
> scala> Console println myRDD.getNumPartitions
> 12
>
> 3) Can you please check spark-shell what happens when you paste the above
> method and call it like:
>
> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] =
> {
>      |   (0 until numSlices).iterator.map { i =>
>      |     val start = ((i * length) / numSlices).toInt
>      |       val end = (((i + 1) * length) / numSlices).toInt
>      |       (start, end)
>      |   }
>      | }
> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>
> scala> positions(5, 12).foreach(println)
> (0,0)
> (0,0)
> (0,1)
> (1,1)
> (1,2)
> (2,2)
> (2,2)
> (2,3)
> (3,3)
> (3,4)
> (4,4)
> (4,5)
>
> As you can see in my case the `positions` result consistent with the 
> `foreachPartition`
> and this should be deterministic.
>
> Best regards,
> Attila
>
>
> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a question with respect to default partitioning in RDD.
>>
>>
>>
>>
>> *case class Animal(id:Int, name:String)   val myRDD =
>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>
>> I am running the above piece of code in my laptop which has 12 logical
>> cores.
>> Hence I see that there are 12 partitions created.
>>
>> My understanding is that hash partitioning is used to determine which
>> object needs to go to which partition. So in this case, the formula would
>> be: hashCode() % 12
>> But when I further examine, I see all the RDDs are put in the last
>> partition.
>>
>> *myRDD.foreachPartition( e => { println("----------"); e.foreach(println)
>> } )*
>>
>> Above code prints the below(first eleven partitions are empty and the
>> last one has all the objects. The line is separate the partition contents):
>> ----------
>> ----------
>> ----------
>> ----------
>> ----------
>> ----------
>> ----------
>> ----------
>> ----------
>> ----------
>> ----------
>> ----------
>> Animal(2,Elephant)
>> Animal(4,Tiger)
>> Animal(3,Jaguar)
>> Animal(5,Chetah)
>> Animal(1,Lion)
>>
>> I don't know why this happens. Can you please help.
>>
>> Thanks!
>>
>

Reply via email to