I don't think we guarantee anywhere that union(A, B) will behave by
concatenating the partitions, it just happens to be an artifact of the
current implementation.

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
wouldn't violate the contract of union

AFIAK the only guarentee is the resulting RDD will contain all elements.

- Patrick

On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim <m...@palantir.com> wrote:
> Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
> but the order of rows semantically are kept throughout non-shuffling
> transforms. I’m on board with you on union as well.
>
> Back to the original question, then, why is it important to coalesce to a
> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
> two reds are concatenated.
>
> Mingyu
>
>
>
>
> On 4/29/14, 10:55 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:
>
>>If you call map() on an RDD it will retain the ordering it had before,
>>but that is not necessarily a correct sort order for the new RDD.
>>
>>var rdd = sc.parallelize([2, 1, 3]);
>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>
>>Note that mapped is no longer sorted.
>>
>>When you union two RDD's together it will effectively concatenate the
>>two orderings, which is also not a valid sorted order on the new RDD:
>>
>>rdd1 = [1,2,3]
>>rdd2 = [1,4,5]
>>
>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>
>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <m...@palantir.com> wrote:
>>> Thanks for the quick response!
>>>
>>> To better understand it, the reason sorted RDD has a well-defined
>>>ordering
>>> is because sortedRDD.getPartitions() returns the partitions in the right
>>> order and each partition internally is properly sorted. So, if you have
>>>
>>> var rdd = sc.parallelize([2, 1, 3]);
>>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>>
>>> Since mapValues doesn’t change the order of partitions not change the
>>> order of rows within the partitions, I think “mapped” should have the
>>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>>the
>>> order will change. Am I mistaken? Is there an extra detail in sortedRDD
>>> that guarantees a well-defined ordering?
>>>
>>> If it’s true that the order of partitions returned by
>>>RDD.getPartitions()
>>> and the row orders within the partitions determine the row order, I’m
>>>not
>>> sure why union doesn’t respect the order because union operation simply
>>> concatenates the two lists of partitions from the two RDDs.
>>>
>>> Mingyu
>>>
>>>
>>>
>>>
>>> On 4/29/14, 10:25 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:
>>>
>>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>>ordering.
>>>>
>>>>But that ordering is lost as soon as you transform the RDD, including
>>>>if you union it with another RDD.
>>>>
>>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <m...@palantir.com> wrote:
>>>>> Hi Patrick,
>>>>>
>>>>> I¹m a little confused about your comment that RDDs are not ordered. As
>>>>>far
>>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>>why I
>>>>> can call RDD.take() and get the same first k rows every time I call it
>>>>>and
>>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>>>> preserves the partition order. RDD order is also what allows me to get
>>>>>the
>>>>> top k out of RDD by doing RDD.sort().take().
>>>>>
>>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>>>that
>>>>> the order is not well preserved? Thanks in advance!
>>>>>
>>>>> Mingyu
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:
>>>>>
>>>>>>Ah somehow after all this time I've never seen that!
>>>>>>
>>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>>>><buendia...@gmail.com>
>>>>>>wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>>>>><pwend...@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>>>
>>>>>>>
>>>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>>>
>>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>>>>> together it doesn't have a well defined ordering.
>>>>>>>>
>>>>>>>> If you do want to do this you could coalesce into one partition,
>>>>>>>>then
>>>>>>>> call MapPartitions and return an iterator that first adds your
>>>>>>>>header
>>>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in
>>>>>>>>mind
>>>>>>>> this will only work if you coalesce into a single partition.
>>>>>>>
>>>>>>>
>>>>>>> Thanks! I'll give this a try.
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> myRdd.coalesce(1)
>>>>>>>> .map(_.mkString(",")))
>>>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>>>> .saveAsTextFile("out.csv")
>>>>>>>>
>>>>>>>> - Patrick
>>>>>>>>
>>>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>>>> <buendia...@gmail.com> wrote:
>>>>>>>> > Hi,
>>>>>>>> >
>>>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>>>> > saveAsTextFile,
>>>>>>>> > and I came up with this:
>>>>>>>> >
>>>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>>>> >       .saveAsTextFile("out.csv")
>>>>>>>> >
>>>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>>>does
>>>>>>>>not
>>>>>>>> > return both RDD's?
>>>>>>>
>>>>>>>

Reply via email to