Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/44#discussion_r10194233
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -847,6 +847,8 @@ class SparkContext(
           partitions: Seq[Int],
           allowLocal: Boolean,
           resultHandler: (Int, U) => Unit) {
    +    val rddPartitions = rdd.partitions.map(_.index)
    +    require(partitions.forall(rddPartitions.contains(_)), "partition index 
out of range")
    --- End diff --
    
    Just scanned the code, this issue (partitions does not match with 
rdd.partitions.map(_.index)) can only happen when you run the computation based 
on the correctness of partitioner
    
    In current implementation, there are only two cases:
    
    First is lookup, the computation is based on the correctness of 
getPartition()
    
    ```
    def lookup(key: K): Seq[V] = {
        self.partitioner match {
          case Some(p) =>
            val index = p.getPartition(key)
            def process(it: Iterator[(K, V)]): Seq[V] = {
              val buf = new ArrayBuffer[V]
              for ((k, v) <- it if k == key) {
                buf += v
              }
              buf
            }
            val res = self.context.runJob(self, process _, Array(index), false)
            res(0)
          case None =>
            self.filter(_._1 == key).map(_._2).collect()
        }
      }
    
    ```
    
    the other case is ShuffleMapTask
    
    ```
    // Write the map output to its associated buckets.
          for (elem <- rdd.iterator(split, context)) {
            val pair = elem.asInstanceOf[Product2[Any, Any]]
            val bucketId = dep.partitioner.getPartition(pair._1)
            shuffle.writers(bucketId).write(pair)
          }
    
    ```
    
    I'm not sure which fix option is better, add a checking condition in 
SparkContext, or we have a specific checking in these two places separately
    
    I just felt that without looking at the code, I cannot get the idea why the 
partitions does not match rdd.partitions (if you look at how SparkContext run 
the job you will get more confusion, because partitions are exactly derived 
from "0 until rdd.partitions.size")
    
    ```
    /**
       * Run a job on all partitions in an RDD and return the results in an 
array.
       */
      def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] 
= {
        runJob(rdd, func, 0 until rdd.partitions.size, false)
      }
    
    ```
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to