I don't think we guarantee anywhere that union(A, B) will behave by concatenating the partitions, it just happens to be an artifact of the current implementation.
rdd1 = [1,2,3] rdd2 = [1,4,5] rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it wouldn't violate the contract of union AFIAK the only guarentee is the resulting RDD will contain all elements. - Patrick On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim <m...@palantir.com> wrote: > Yes, that’s what I meant. Sure, the numbers might not be actually sorted, > but the order of rows semantically are kept throughout non-shuffling > transforms. I’m on board with you on union as well. > > Back to the original question, then, why is it important to coalesce to a > single partition? When you union two RDDs, for example, rdd1 = [“a, b, > c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then > rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three > lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the > two reds are concatenated. > > Mingyu > > > > > On 4/29/14, 10:55 PM, "Patrick Wendell" <pwend...@gmail.com> wrote: > >>If you call map() on an RDD it will retain the ordering it had before, >>but that is not necessarily a correct sort order for the new RDD. >> >>var rdd = sc.parallelize([2, 1, 3]); >>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3] >>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0] >> >>Note that mapped is no longer sorted. >> >>When you union two RDD's together it will effectively concatenate the >>two orderings, which is also not a valid sorted order on the new RDD: >> >>rdd1 = [1,2,3] >>rdd2 = [1,4,5] >> >>rdd1.union(rdd2) = [1,2,3,1,4,5] >> >>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <m...@palantir.com> wrote: >>> Thanks for the quick response! >>> >>> To better understand it, the reason sorted RDD has a well-defined >>>ordering >>> is because sortedRDD.getPartitions() returns the partitions in the right >>> order and each partition internally is properly sorted. So, if you have >>> >>> var rdd = sc.parallelize([2, 1, 3]); >>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3] >>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4] >>> >>> Since mapValues doesn’t change the order of partitions not change the >>> order of rows within the partitions, I think “mapped” should have the >>> exact same order as “sorted”. Sure, if a transform involves shuffling, >>>the >>> order will change. Am I mistaken? Is there an extra detail in sortedRDD >>> that guarantees a well-defined ordering? >>> >>> If it’s true that the order of partitions returned by >>>RDD.getPartitions() >>> and the row orders within the partitions determine the row order, I’m >>>not >>> sure why union doesn’t respect the order because union operation simply >>> concatenates the two lists of partitions from the two RDDs. >>> >>> Mingyu >>> >>> >>> >>> >>> On 4/29/14, 10:25 PM, "Patrick Wendell" <pwend...@gmail.com> wrote: >>> >>>>You are right, once you sort() the RDD, then yes it has a well defined >>>>ordering. >>>> >>>>But that ordering is lost as soon as you transform the RDD, including >>>>if you union it with another RDD. >>>> >>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <m...@palantir.com> wrote: >>>>> Hi Patrick, >>>>> >>>>> I¹m a little confused about your comment that RDDs are not ordered. As >>>>>far >>>>> as I know, RDDs keep list of partitions that are ordered and this is >>>>>why I >>>>> can call RDD.take() and get the same first k rows every time I call it >>>>>and >>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map >>>>> preserves the partition order. RDD order is also what allows me to get >>>>>the >>>>> top k out of RDD by doing RDD.sort().take(). >>>>> >>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk >>>>>that >>>>> the order is not well preserved? Thanks in advance! >>>>> >>>>> Mingyu >>>>> >>>>> >>>>> >>>>> >>>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <pwend...@gmail.com> wrote: >>>>> >>>>>>Ah somehow after all this time I've never seen that! >>>>>> >>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia >>>>>><buendia...@gmail.com> >>>>>>wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell >>>>>>><pwend...@gmail.com> >>>>>>> wrote: >>>>>>>> >>>>>>>> What is the ++ operator here? Is this something you defined? >>>>>>> >>>>>>> >>>>>>> No, it's an alias for union defined in RDD.scala: >>>>>>> >>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other) >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Another issue is that RDD's are not ordered, so when you union two >>>>>>>> together it doesn't have a well defined ordering. >>>>>>>> >>>>>>>> If you do want to do this you could coalesce into one partition, >>>>>>>>then >>>>>>>> call MapPartitions and return an iterator that first adds your >>>>>>>>header >>>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in >>>>>>>>mind >>>>>>>> this will only work if you coalesce into a single partition. >>>>>>> >>>>>>> >>>>>>> Thanks! I'll give this a try. >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> myRdd.coalesce(1) >>>>>>>> .map(_.mkString(","))) >>>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator) >>>>>>>> .saveAsTextFile("out.csv") >>>>>>>> >>>>>>>> - Patrick >>>>>>>> >>>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia >>>>>>>> <buendia...@gmail.com> wrote: >>>>>>>> > Hi, >>>>>>>> > >>>>>>>> > I'm trying to find a way to create a csv header when using >>>>>>>> > saveAsTextFile, >>>>>>>> > and I came up with this: >>>>>>>> > >>>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++ >>>>>>>> > myRdd.coalesce(1).map(_.mkString(","))) >>>>>>>> > .saveAsTextFile("out.csv") >>>>>>>> > >>>>>>>> > But it only saves the header part. Why is that the union method >>>>>>>>does >>>>>>>>not >>>>>>>> > return both RDD's? >>>>>>> >>>>>>>