Ajay,

This is true. When we call join again on two RDD's.Rather than computing
the whole pipe again, It reads the map output of the map phase of an
RDD(which it usually gets from shuffle manager).

If you see the code:

 override def compute(s: Partition, context: TaskContext): Iterator[(K,
Array[Iterable[_]])] = {

    val sparkConf = SparkEnv.get.conf

    val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)

    for ((dep, depNum) <- split.deps.zipWithIndex) dep match {

      case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>

        // Read them from the parent

        val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[
Product2[K, Any]]]

        rddIterators += ((it, depNum))


      case ShuffleCoGroupSplitDep(handle) =>

        // Read map outputs of shuffle

        val it = SparkEnv.get.shuffleManager

          .getReader(handle, split.index, split.index + 1, context)

          .read()

        rddIterators += ((it, depNum))

    }

This is CoGroupedRDD.scala, spark-1.3 code.
If you see the UI, it shows these map stages as skipped.(And, this answers
your question as well, Hoai-Thu Vong[in different thread about skipped
stages.]).

Thanks and Regards,

Archit Thakur.




On Thu, Nov 13, 2014 at 3:10 PM, ajay garg <ajay.g...@mobileum.com> wrote:

> Yes that is my understanding of how it should work.
> But in my case when I call collect first time, it reads the data from files
> on the disk.
> Subsequent collect queries are not reading data files ( Verified from the
> logs.)
> On spark ui I see only shuffle read and no shuffle write.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820p18829.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to