Thanks for the quick response!

To better understand it, the reason sorted RDD has a well-defined ordering
is because sortedRDD.getPartitions() returns the partitions in the right
order and each partition internally is properly sorted. So, if you have

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]

Since mapValues doesn’t change the order of partitions not change the
order of rows within the partitions, I think “mapped” should have the
exact same order as “sorted”. Sure, if a transform involves shuffling, the
order will change. Am I mistaken? Is there an extra detail in sortedRDD
that guarantees a well-defined ordering?

If it’s true that the order of partitions returned by RDD.getPartitions()
and the row orders within the partitions determine the row order, I’m not
sure why union doesn’t respect the order because union operation simply
concatenates the two lists of partitions from the two RDDs.

Mingyu




On 4/29/14, 10:25 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:

>You are right, once you sort() the RDD, then yes it has a well defined
>ordering.
>
>But that ordering is lost as soon as you transform the RDD, including
>if you union it with another RDD.
>
>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <m...@palantir.com> wrote:
>> Hi Patrick,
>>
>> I¹m a little confused about your comment that RDDs are not ordered. As
>>far
>> as I know, RDDs keep list of partitions that are ordered and this is
>>why I
>> can call RDD.take() and get the same first k rows every time I call it
>>and
>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>> preserves the partition order. RDD order is also what allows me to get
>>the
>> top k out of RDD by doing RDD.sort().take().
>>
>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>that
>> the order is not well preserved? Thanks in advance!
>>
>> Mingyu
>>
>>
>>
>>
>> On 1/22/14, 4:46 PM, "Patrick Wendell" <pwend...@gmail.com> wrote:
>>
>>>Ah somehow after all this time I've never seen that!
>>>
>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>><buendia...@gmail.com>
>>>wrote:
>>>>
>>>>
>>>>
>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <pwend...@gmail.com>
>>>> wrote:
>>>>>
>>>>> What is the ++ operator here? Is this something you defined?
>>>>
>>>>
>>>> No, it's an alias for union defined in RDD.scala:
>>>>
>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>
>>>>>
>>>>>
>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>> together it doesn't have a well defined ordering.
>>>>>
>>>>> If you do want to do this you could coalesce into one partition, then
>>>>> call MapPartitions and return an iterator that first adds your header
>>>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>>>> this will only work if you coalesce into a single partition.
>>>>
>>>>
>>>> Thanks! I'll give this a try.
>>>>
>>>>>
>>>>>
>>>>> myRdd.coalesce(1)
>>>>> .map(_.mkString(",")))
>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>> .saveAsTextFile("out.csv")
>>>>>
>>>>> - Patrick
>>>>>
>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>> <buendia...@gmail.com> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I'm trying to find a way to create a csv header when using
>>>>> > saveAsTextFile,
>>>>> > and I came up with this:
>>>>> >
>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>> >       .saveAsTextFile("out.csv")
>>>>> >
>>>>> > But it only saves the header part. Why is that the union method
>>>>>does
>>>>>not
>>>>> > return both RDD's?
>>>>
>>>>

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to