The relevant JIRA that springs to mind is
https://issues.apache.org/jira/browse/SPARK-2926

If an aggregator and ordering are both defined, then the map side of
sort-based shuffle will sort based on the key ordering so that map-side
spills can be efficiently merged.  We do not currently do a sort-based
merge on the reduce side; implementing this is a little tricky because it
will require more map partitions' output to be buffered on the reduce
side.  I think that SPARK-2926 has some proposals of how to deal with this,
including hierarchical merging of reduce outputs.

RE: ExternalSorter#partitionedIterator, I don't think it's safe to do
!ordering.isDefined
&& !aggregator.isDefined.  If an aggregator is defined but we don't have an
ordering, then I don't think it makes sense to sort the keys based on their
hashcodes or some default ordering, since hashcode collisions would lead to
incorrect results for sort-based aggregation.

On Tue, Jun 2, 2015 at 1:50 PM, John Carrino <john.carr...@gmail.com> wrote:

> One thing I have noticed with ExternalSorter is that if an ordering is not
> defined, it does the sort using only the partition_id, instead of
> (parition_id, hash).  This means that on the reduce side you need to pull
> the entire dataset into memory before you can begin iterating over the
> results.
>
> I figure since we are doing a sort of the data anyway it doesn't seem more
> expensive to sort by (parition, hash).  That way the reducer can do a merge
> and only has the hold in memory the data for a single int hashCode before
> it can combine then and start returning results form the iterator.
>
> Has this already been discussed?  If so, can someone point me in the right
> direction to find out more?
>
> Thanks for any help!
> -jc
>
> p.s. I am using spark version 1.3.1.  The code I am looking at below is
> from ExternalSorter#partitionedIterator.  I think maybe
> !ordering.isDefined should also include "&& !aggregator.isDefined"
>
>    if (spills.isEmpty && partitionWriters == null) {
>       // Special case: if we have only in-memory data, we don't need to
> merge streams, and perhaps
>       // we don't even need to sort by anything other than partition ID
>       if (!ordering.isDefined) {
>         // The user hasn't requested sorted keys, so only sort by
> partition ID, not key
>
> groupByPartition(collection.destructiveSortedIterator(partitionComparator))
>       } else {
>         // We do need to sort by both partition ID and key
>
> groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator))
>       }
>

Reply via email to