If you call map() on an RDD it will retain the ordering it had before, but that is not necessarily a correct sort order for the new RDD.
var rdd = sc.parallelize([2, 1, 3]); var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3] var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0] Note that mapped is no longer sorted. When you union two RDD's together it will effectively concatenate the two orderings, which is also not a valid sorted order on the new RDD: rdd1 = [1,2,3] rdd2 = [1,4,5] rdd1.union(rdd2) = [1,2,3,1,4,5] On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <m...@palantir.com> wrote: > Thanks for the quick response! > > To better understand it, the reason sorted RDD has a well-defined ordering > is because sortedRDD.getPartitions() returns the partitions in the right > order and each partition internally is properly sorted. So, if you have > > var rdd = sc.parallelize([2, 1, 3]); > var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3] > var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4] > > Since mapValues doesn’t change the order of partitions not change the > order of rows within the partitions, I think “mapped” should have the > exact same order as “sorted”. Sure, if a transform involves shuffling, the > order will change. Am I mistaken? Is there an extra detail in sortedRDD > that guarantees a well-defined ordering? > > If it’s true that the order of partitions returned by RDD.getPartitions() > and the row orders within the partitions determine the row order, I’m not > sure why union doesn’t respect the order because union operation simply > concatenates the two lists of partitions from the two RDDs. > > Mingyu > > > > > On 4/29/14, 10:25 PM, "Patrick Wendell" <pwend...@gmail.com> wrote: > >>You are right, once you sort() the RDD, then yes it has a well defined >>ordering. >> >>But that ordering is lost as soon as you transform the RDD, including >>if you union it with another RDD. >> >>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <m...@palantir.com> wrote: >>> Hi Patrick, >>> >>> I¹m a little confused about your comment that RDDs are not ordered. As >>>far >>> as I know, RDDs keep list of partitions that are ordered and this is >>>why I >>> can call RDD.take() and get the same first k rows every time I call it >>>and >>> RDD.take() returns the same entries as RDD.map(Š).take() because map >>> preserves the partition order. RDD order is also what allows me to get >>>the >>> top k out of RDD by doing RDD.sort().take(). >>> >>> Am I misunderstanding it? Or, is it just when RDD is written to disk >>>that >>> the order is not well preserved? Thanks in advance! >>> >>> Mingyu >>> >>> >>> >>> >>> On 1/22/14, 4:46 PM, "Patrick Wendell" <pwend...@gmail.com> wrote: >>> >>>>Ah somehow after all this time I've never seen that! >>>> >>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia >>>><buendia...@gmail.com> >>>>wrote: >>>>> >>>>> >>>>> >>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <pwend...@gmail.com> >>>>> wrote: >>>>>> >>>>>> What is the ++ operator here? Is this something you defined? >>>>> >>>>> >>>>> No, it's an alias for union defined in RDD.scala: >>>>> >>>>> def ++(other: RDD[T]): RDD[T] = this.union(other) >>>>> >>>>>> >>>>>> >>>>>> Another issue is that RDD's are not ordered, so when you union two >>>>>> together it doesn't have a well defined ordering. >>>>>> >>>>>> If you do want to do this you could coalesce into one partition, then >>>>>> call MapPartitions and return an iterator that first adds your header >>>>>> and then the rest of the file, then call saveAsTextFile. Keep in mind >>>>>> this will only work if you coalesce into a single partition. >>>>> >>>>> >>>>> Thanks! I'll give this a try. >>>>> >>>>>> >>>>>> >>>>>> myRdd.coalesce(1) >>>>>> .map(_.mkString(","))) >>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator) >>>>>> .saveAsTextFile("out.csv") >>>>>> >>>>>> - Patrick >>>>>> >>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia >>>>>> <buendia...@gmail.com> wrote: >>>>>> > Hi, >>>>>> > >>>>>> > I'm trying to find a way to create a csv header when using >>>>>> > saveAsTextFile, >>>>>> > and I came up with this: >>>>>> > >>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++ >>>>>> > myRdd.coalesce(1).map(_.mkString(","))) >>>>>> > .saveAsTextFile("out.csv") >>>>>> > >>>>>> > But it only saves the header part. Why is that the union method >>>>>>does >>>>>>not >>>>>> > return both RDD's? >>>>> >>>>>