If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <m...@palantir.com> wrote:
> Thanks for the quick response!
>
> To better understand it, the reason sorted RDD has a well-defined ordering
> is because sortedRDD.getPartitions() returns the partitions in the right
> order and each partition internally is properly sorted. So, if you have
>
> var rdd = sc.parallelize([2, 1, 3]);
> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>
> Since mapValues doesn’t change the order of partitions not change the
> order of rows within the partitions, I think “mapped” should have the
> exact same order as “sorted”. Sure, if a transform involves shuffling, the
> order will change. Am I mistaken? Is there an extra detail in sortedRDD
> that guarantees a well-defined ordering?
>
> If it’s true that the order of partitions returned by RDD.getPartitions()
> and the row orders within the partitions determine the row order, I’m not
> sure why union doesn’t respect the order because union operation simply
> concatenates the two lists of partitions from the two RDDs.
>
> Mingyu
>
>
>
>
> On 4/29/14, 10:25 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:
>
>>You are right, once you sort() the RDD, then yes it has a well defined
>>ordering.
>>
>>But that ordering is lost as soon as you transform the RDD, including
>>if you union it with another RDD.
>>
>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <m...@palantir.com> wrote:
>>> Hi Patrick,
>>>
>>> I¹m a little confused about your comment that RDDs are not ordered. As
>>>far
>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>why I
>>> can call RDD.take() and get the same first k rows every time I call it
>>>and
>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>> preserves the partition order. RDD order is also what allows me to get
>>>the
>>> top k out of RDD by doing RDD.sort().take().
>>>
>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>that
>>> the order is not well preserved? Thanks in advance!
>>>
>>> Mingyu
>>>
>>>
>>>
>>>
>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:
>>>
>>>>Ah somehow after all this time I've never seen that!
>>>>
>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>><buendia...@gmail.com>
>>>>wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <pwend...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>
>>>>>
>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>
>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>
>>>>>>
>>>>>>
>>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>>> together it doesn't have a well defined ordering.
>>>>>>
>>>>>> If you do want to do this you could coalesce into one partition, then
>>>>>> call MapPartitions and return an iterator that first adds your header
>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>>>>> this will only work if you coalesce into a single partition.
>>>>>
>>>>>
>>>>> Thanks! I'll give this a try.
>>>>>
>>>>>>
>>>>>>
>>>>>> myRdd.coalesce(1)
>>>>>> .map(_.mkString(",")))
>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>> .saveAsTextFile("out.csv")
>>>>>>
>>>>>> - Patrick
>>>>>>
>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>> <buendia...@gmail.com> wrote:
>>>>>> > Hi,
>>>>>> >
>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>> > saveAsTextFile,
>>>>>> > and I came up with this:
>>>>>> >
>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>> >       .saveAsTextFile("out.csv")
>>>>>> >
>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>does
>>>>>>not
>>>>>> > return both RDD's?
>>>>>
>>>>>

Reply via email to