The relevant JIRA that springs to mind is https://issues.apache.org/jira/browse/SPARK-2926
If an aggregator and ordering are both defined, then the map side of sort-based shuffle will sort based on the key ordering so that map-side spills can be efficiently merged. We do not currently do a sort-based merge on the reduce side; implementing this is a little tricky because it will require more map partitions' output to be buffered on the reduce side. I think that SPARK-2926 has some proposals of how to deal with this, including hierarchical merging of reduce outputs. RE: ExternalSorter#partitionedIterator, I don't think it's safe to do !ordering.isDefined && !aggregator.isDefined. If an aggregator is defined but we don't have an ordering, then I don't think it makes sense to sort the keys based on their hashcodes or some default ordering, since hashcode collisions would lead to incorrect results for sort-based aggregation. On Tue, Jun 2, 2015 at 1:50 PM, John Carrino <john.carr...@gmail.com> wrote: > One thing I have noticed with ExternalSorter is that if an ordering is not > defined, it does the sort using only the partition_id, instead of > (parition_id, hash). This means that on the reduce side you need to pull > the entire dataset into memory before you can begin iterating over the > results. > > I figure since we are doing a sort of the data anyway it doesn't seem more > expensive to sort by (parition, hash). That way the reducer can do a merge > and only has the hold in memory the data for a single int hashCode before > it can combine then and start returning results form the iterator. > > Has this already been discussed? If so, can someone point me in the right > direction to find out more? > > Thanks for any help! > -jc > > p.s. I am using spark version 1.3.1. The code I am looking at below is > from ExternalSorter#partitionedIterator. I think maybe > !ordering.isDefined should also include "&& !aggregator.isDefined" > > if (spills.isEmpty && partitionWriters == null) { > // Special case: if we have only in-memory data, we don't need to > merge streams, and perhaps > // we don't even need to sort by anything other than partition ID > if (!ordering.isDefined) { > // The user hasn't requested sorted keys, so only sort by > partition ID, not key > > groupByPartition(collection.destructiveSortedIterator(partitionComparator)) > } else { > // We do need to sort by both partition ID and key > > groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator)) > } >