GitHub user JoshRosen opened a pull request:
https://github.com/apache/spark/pull/2890
[SPARK-3426] Fix sort-based shuffle error when spark.shuffle.compress and
spark.shuffle.spill.compress settings are different
This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the
`spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have
different values.
The problem is that sort-based shuffle's read and write paths use different
settings for determining whether to apply compression. ExternalSorter
writes
runs to files using `TempBlockId` ids, which causes
`spark.shuffle.spill.compress` to be used for enabling compression, but
these
spilled files end up being shuffled over the network and read as shuffle
files
using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes
`spark.shuffle.compress` to be used for enabling decompression. As a
result,
this leads to errors when these settings disagree.
Based on the discussions in #2247 and #2178, it sounds like we don't want to
remove the `spark.shuffle.spill.compress` setting. Therefore, I've tried to
come up with a fix where `spark.shuffle.spill.compress` is used to compress
data that's read and written locally and `spark.shuffle.compress` is used to
compress any data that will be fetched / read as shuffle blocks.
To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId`
and
`TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and
`spark.shuffle.compress`, respectively. ExternalAppendOnlyMap also used
temp
blocks for spilling data. It looks like ExternalSorter was designed to be
a generic sorter but its configuration already happens to be tied to
sort-based
shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress
its spills; we can move the compression configuration to the constructor in
a later commit if we find that ExternalSorter is being used in other
contexts
where we want different configuration options to control compression. To
summarize:
**Before:**
| | ExternalAppendOnlyMap | ExternalSorter |
|-------|------------------------------|------------------------------|
| Read | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress |
**After:**
| | ExternalAppendOnlyMap | ExternalSorter |
|-------|------------------------------|------------------------|
| Read | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.compress |
Thanks to @andrewor14 for debugging this with me!
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/JoshRosen/spark SPARK-3426
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/2890.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2890
----
commit 76ca65e0c36fca292e32cb948d5507a4270c24e0
Author: Josh Rosen <[email protected]>
Date: 2014-10-21T23:51:17Z
Add regression test for SPARK-3426.
commit ab52f9b4aa70913e3cf25eb0c81c3b900de7e0ed
Author: Josh Rosen <[email protected]>
Date: 2014-10-22T06:56:24Z
Fix SPARK-3426.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]