One thing I have noticed with ExternalSorter is that if an ordering is not defined, it does the sort using only the partition_id, instead of (parition_id, hash). This means that on the reduce side you need to pull the entire dataset into memory before you can begin iterating over the results.
I figure since we are doing a sort of the data anyway it doesn't seem more expensive to sort by (parition, hash). That way the reducer can do a merge and only has the hold in memory the data for a single int hashCode before it can combine then and start returning results form the iterator. Has this already been discussed? If so, can someone point me in the right direction to find out more? Thanks for any help! -jc p.s. I am using spark version 1.3.1. The code I am looking at below is from ExternalSorter#partitionedIterator. I think maybe !ordering.isDefined should also include "&& !aggregator.isDefined" if (spills.isEmpty && partitionWriters == null) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(collection.destructiveSortedIterator(partitionComparator)) } else { // We do need to sort by both partition ID and key groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator)) }