Hi Karuppayya,

Thanks for putting this together - I've been following the discussion on
this interesting topic.

I really like the simplicity of not needing an external shuffle service.
Push-based shuffle works, but managing ESS adds operational complexity.
Having this native in Spark keeps things simpler, and it would be an ops
win.

Wenchen raised an interesting point about AQE integration being a key
differentiator. I agree that having Spark dynamically decide whether
consolidation is worth it based on actual runtime stats could be really
powerful.

This got me thinking: what if we approach this incrementally?

*Start with local consolidation + AQE* - prove out the consolidation stage
concept and show it helps with IO efficiency. This is simpler to implement
and we can measure the impact on real workloads without dealing with remote
storage complexity yet.

*Then add remote storage *- once we've proven the consolidation stage is
beneficial, layer in the elasticity benefits of remote storage that your
proposal focuses on.

Wenchen mentioned the consolidation stage can be valuable even without
remote storage, and that feels right to me. It would also help address
Mridul and Yang's concerns about overlap with existing projects - if we can
show the AQE-integrated consolidation has unique benefits first, it makes
the case stronger.

A few implementation questions I'm curious about:

- For large/skewed partitions, could we split consolidation across multiple
tasks? Like if partition X is 5GB, maybe 5 consolidators each handle 1GB
worth of mapper outputs. Still way better than fetching from thousands of
mappers, and it keeps disk/memory bounded per task.
- For the remote storage phase, would it support streaming writes (S3
multipart) to avoid holding huge merged partitions in memory/disk?

Your proposal addresses a real problem - large shuffles with millions of
small blocks are painful in production, but I think it would be more useful
if it is integrated with AQE. The incremental approach might be a pragmatic
way to deliver value sooner.


On Wed, Dec 3, 2025 at 3:52 PM karuppayya <[email protected]> wrote:

> Wenchen,
> Thanks for being supportive of the idea.
> I think I missed addressing some parts of your first email.
>
> *we can decrease the number of nodes that host active shuffle data, so
>> that the cluster is more elastic.*
>
>
> - We can write to local storage since we are using the Hadoop API.
> - We could optimize task placement to run on hosts containing active
> shuffle data.
> - However, this poses a risk of disks filling up sooner( especially with
> large shuffles), leading to task failures. This issue is amplified in
> multi-user/session environments (e.g., Spark Connect) where unrelated jobs
> might fill the disk, causing confusing diagnostic issues.
> - *I propose offering this as an optional setting/config that
> knowledgeable users can enable, particularly in environments with
> sufficient local storage capacity.*
>
> *push-based shuffle framework to support other deployments (standalone,
>> k8s) and remote storage?*
>
>
> - Since the existing framework is YARN-dependent, retrofitting it for
> other resource managers would require extensive logic for handling resource
> management and shuffle service deployment differences.
> - I prefer keeping the consolidation stage separate for a cleaner, more
> generalized design.
> *- However, if a significant architectural overlap becomes apparent, then
> integration should certainly be reconsidered.*
>
>
> Hi Enrico,
> Thanks for the ideas.
> I think double writes(either as a direct write or a copy after local
> write), would
> -  Increases overhead and adds more failure points.
> - Executors holding onto shuffle data would prevent aggressive cluster
> downscaling.
> *While we have slightly lower latency, I think the delta between local and
> remote reads is small compared to overall job time.*
> For small jobs, it would be beneficial to bypass the shuffle consolidation
> stage dynamically at runtime to avoid unnecessary overhead.
>
> * Is "shuffle consolidation" the preferred term?*
>
>
> While the existing Spark term is "shuffle merge," which also involves
> combining shuffle blocks, I am using "shuffle consolidation"  mainly for
> disambiguation.
> *- Shuffle merge *- combining few shuffle blocks
> *- Shuffle consolidation* - merge all fragmented shuffle blocks for a
> given reducer partition
> *However, I don't have a strong opinion on which term is ultimately used,
> as long as the function is clearly understood.*
>
> Regards
> Karuppayya
>
>
> On Wed, Dec 3, 2025 at 2:37 AM Enrico Minack <[email protected]> wrote:
>
>> Hi Karuppayya,
>>
>> Thanks for the clarification.
>>
>> I would like to emphasize that a solution would be great that allows to
>> prefer shuffle data from executors over remote storage:
>> If the shuffle consolidation*) stage merges mapper outputs into reducer
>> inputs and stores those on a remote storage, it could easily keep a copy on
>> the consolidating executors. For the lifetime of these executors, reducer
>> executors could preferably fetch the consolidated shuffle data from the
>> consolidating executors, with the obvious benefits. Only for decommissioned
>> consolidation executors or consolidation executors on failed nodes, reducer
>> executors fetch consolidated shuffle data from the remote storage.
>>
>> Further, splitting the proposed shuffle consolidation stage into:
>> 1) consolidate shuffle data to executor local shuffle storage (AQE-based
>> shuffle consolidation stage as proposed by Wenchen Fan)
>> 2) replicate local shuffle storage to remote shuffle storage
>> feels like a natural separation of concerns. And it allows for custom
>> configuration to employ only the former without the latter or vice versa.
>>
>> Speaking of the ability to read shuffle data from executors during their
>> existence while falling back to the remote storage replica reminds of the
>> existing Fallback Storage logic. Feature 2) could be implemented by
>> evolving existing infrastructure (Fallback Storage) with only hundred lines
>> of code. The read-from-executors-first-then-remote-storage feature would
>> cost a few more hundred lines of code.
>>
>> Cheers,
>> Enrico
>>
>>
>> *) Is "shuffle consolidation" the preferred term? Isn't the existing
>> Spark term "shuffle merge" exactly what is being described here, maybe with
>> some small extension?
>>
>>
>> Am 01.12.25 um 20:30 schrieb karuppayya:
>>
>> Hi everyone,
>> Thank you all for your valuable comments and discussion on the design
>> document/this email. I have replied to the comments/concerns raised.
>> I welcome any other questions and to be challenged further.
>>
>> Also *Sun Chao* accepted to shepherd this proposal(Thank you!)
>>
>> If there are no other open questions by Wednesday morning (PST), I will
>> request Chao to open the official voting thread (which should give 72
>> hours for the process
>> <https://spark.apache.org/improvement-proposals.html>).
>>
>> - Karuppayya
>>
>> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <
>> [email protected]> wrote:
>>
>>> One aspect that hasn’t been mentioned yet (or so I think) is the
>>> thread-level behavior of shuffle. In large workloads with many small
>>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of
>>> threads tied to shuffle fetch operations, Netty client handlers, and block
>>> file access.
>>> Since the proposal changes block granularity and fetch patterns, it
>>> would be valuable to explicitly consider how the consolidation stage
>>> affects:
>>> – the number of concurrent fetch operations
>>> – thread pool growth / saturation
>>> – Netty transport threads
>>> – memory pressure from large in-flight reads
>>>
>>> Btw, I find your proposal quite interesting.
>>>
>>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
>>> escribió:
>>>
>>>> Rishab, Wenchen, Murali,
>>>> Thank you very much for taking the time to review the proposal and for
>>>> providing such thoughtful and insightful comments/questions.
>>>>
>>>> *Rishab*,
>>>>
>>>>> * suitable across all storage systems*
>>>>
>>>> You are right that the suitability is somewhat subjective and dependent
>>>> on cloud provider used.
>>>> In general, the goal of ShuffleVault is to utilize the standard Hadoop
>>>> FileSystem APIs, which means it should work seamlessly with popular cloud
>>>> and distributed file systems (like S3, HDFS, GFS, etc.).
>>>> These systems share a similar nature and are designed for large files.
>>>>
>>>> *large file could create problems during retrieval.*
>>>>
>>>> We are mitigating this risk by ensuring that  tasks do not read the
>>>> entire consolidated file at once.
>>>> Instead, the implementation is designed to read the data in configured
>>>> blocks, rather than relying on a single read. *This behavior can be
>>>> refined/validated* to make it more robust.
>>>>
>>>> *Wenchen,*
>>>> I fully agree that the operational details around using cloud storage
>>>> for shuffle—specifically traffic throttling, cleanup guarantees
>>>> and overall request-related network cost — these are critical issues
>>>> that must be solved.
>>>> The consolidation stage is explicitly designed to mitigate the
>>>> throttling and accompanying cost issues .
>>>> *Throttling* - By consolidating shuffle data, this approach transforms
>>>> the read pattern from a multitude of small, random requests into fewer,
>>>> large, targeted ones. Particularly beneficial for modern cloud object
>>>> storage.
>>>> *Shuffle cleanup* -  I am actively trying to leverage the Shufffle
>>>> clean up mode and also making an effort to make them robust .These cleanup
>>>> improvements should be beneficial, regardless of this proposal and cover
>>>> most cases.
>>>> However, I agree that to ensure *no orphaned files* remain, we will
>>>> still require other means (such as remote storage lifecycle policies or
>>>> job-specific scripts) for a guaranteed cleanup.
>>>> Thank you again for your valuable feedback, especially the validation
>>>> on synchronous scheduling and AQE integration.
>>>>
>>>> *Murali,*
>>>>
>>>>> * Doing an explicit sort stage*
>>>>
>>>> To clarify, ShuffleVault does not introduce an explicit sort stage.
>>>> Instead, it introduces a Shuffle Consolidation Stage.
>>>> This stage is a pure passthrough operation that only aggregates
>>>> scattered shuffle data for a given reducer partition.
>>>> In simple terms, it functions as an additional reducer stage that reads
>>>> the fragmented shuffle files from the mappers and writes them as a single,
>>>> consolidated, durable file to remote storage.
>>>>
>>>> *but that would be a nontrivial change *
>>>>
>>>> I agree that the change is significant, I am  actively working to
>>>> ensure the benefits are leveraged across the stack. This PR
>>>> <https://github.com/apache/spark/pull/53028> demonstrates integration
>>>> with AQE and interactions with other rules(Exchange reuse, Shuffle
>>>> Partition Coalescing ect).
>>>> I would genuinely appreciate it if you could take a look at the POC PR
>>>> to see the scope00 of changes. The primary logic is encapsulated within a
>>>> new Spark Physical Planner Rule
>>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
>>>> that injects the consolidation stage, which is the main crux.
>>>>
>>>> I welcome any further questions or comments!
>>>>
>>>> Thanks & Regards
>>>> Karuppayya
>>>>
>>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> There are existing Apache projects which provide the capabilities
>>>>> which largely addresses the problem statement - Apache Celeborn, Apache
>>>>> Uniffle, Zeus, etc.
>>>>> Doing an explicit sort stage, between "map" and "reduce" brings with
>>>>> it some nice advantages, especially if the output is durable, but that
>>>>> would be a nontrivial change - and should be attempted if the benefits are
>>>>> being leveraged throughout the stack (AQE, speculative execution, etc)
>>>>>
>>>>> Regards,
>>>>> Mridul
>>>>>
>>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Karuppayya,
>>>>>>
>>>>>> Handling large shuffles in Spark is challenging and it's great to see
>>>>>> proposals addressing it. I think the extra "shuffle consolidation stage" 
>>>>>> is
>>>>>> a good idea, and now I feel it's better for it to be synchronous, so that
>>>>>> we can integrate it with AQE and leverage the accurate runtime shuffle
>>>>>> status to make decisions about whether or not to launch this extra 
>>>>>> "shuffle
>>>>>> consolidation stage" and how to consolidate. This is a key differentiator
>>>>>> compared to the push-based shuffle.
>>>>>>
>>>>>> However, there are many details to consider, and in general it's
>>>>>> difficult to use cloud storage for shuffle. We need to deal with problems
>>>>>> like traffic throttling, cleanup guarantee, cost control, and so on. 
>>>>>> Let's
>>>>>> take a step back and see what are the actual problems of large shuffles.
>>>>>>
>>>>>> Large shuffle usually starts with a large number of mappers that we
>>>>>> can't adjust (e.g. large table scan). We can adjust the number of 
>>>>>> reducers
>>>>>> to reach two goals:
>>>>>> 1. The input data size of each reducer shouldn't be too large, which
>>>>>> is roughly *total_shuffle_size / num_reducers*. This is to avoid
>>>>>> spilling/OOM during reducer task execution.
>>>>>> 2. The data size of each shuffle block shoudn't be too small, which
>>>>>> is roughly *total_shuffle_size / (num_mappers * num_reducers)*. This
>>>>>> is for the good of disk/network IO.
>>>>>>
>>>>>> These two goals are actually contradictory and sometimes we have to
>>>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the
>>>>>> query can finish. An extra "shuffle consolidation stage" can kind of
>>>>>> decrease the number of mappers, by merging the shuffle files from 
>>>>>> multiple
>>>>>> mappers. This can be a clear win as fetching many small shuffle blocks 
>>>>>> can
>>>>>> be quite slow, even slower than running an extra "shuffle consolidation
>>>>>> stage".
>>>>>>
>>>>>> In addition, the nodes that host shuffle files shouldn't be too many
>>>>>> (best to be 0 which means shuffle files are stored in a different 
>>>>>> storage).
>>>>>> With a large number of mappers, likely every node in the cluster stores
>>>>>> some shuffle files. By merging shuffle files via the extra "shuffle
>>>>>> consolidation stage", we can decrease the number of nodes that host 
>>>>>> active
>>>>>> shuffle data, so that the cluster is more elastic.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Wenchen
>>>>>>
>>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karuppayya,
>>>>>>>
>>>>>>> Thanks for sharing the proposal and this looks very exciting.
>>>>>>>
>>>>>>> I have a few questions and please correct me if I misunderstood
>>>>>>> anything.
>>>>>>>
>>>>>>> Would it be possible to clarify whether the consolidated shuffle
>>>>>>> file produced for each partition is suitable across all storage systems,
>>>>>>> especially when this file becomes extremely large? I am wondering if a 
>>>>>>> very
>>>>>>> large file could create problems during retrieval. For example, if a
>>>>>>> connection breaks while reading the file, some storage systems may not
>>>>>>> support resuming reads from the point of failure and start reading the 
>>>>>>> file
>>>>>>> from the beginning again. This could lead to higher latency, repeated
>>>>>>> retries, or performance bottlenecks when a partition becomes too large 
>>>>>>> or
>>>>>>> skewed?
>>>>>>>
>>>>>>> Would it make sense to introduce a configurable upper-bound on the
>>>>>>> maximum allowed file size? This might prevent the file from growing
>>>>>>> massively.
>>>>>>> Should the consolidated shuffle file be compressed before being
>>>>>>> written to the storage system. Compression might introduce additional
>>>>>>> latency but that too can be a configurable option.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Rishab Joshi
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Enrico,
>>>>>>>> Thank you very much for reviewing the doc.
>>>>>>>>
>>>>>>>> *Since the consolidation stage reads all the shuffle data, why not
>>>>>>>>> doing the transformation in that stage? What is the point in 
>>>>>>>>> deferring the
>>>>>>>>> transformations into another stage?*
>>>>>>>>
>>>>>>>>
>>>>>>>> The reason for deferring the final consolidation to a subsequent
>>>>>>>> stage lies in the distributed nature of shuffle data.
>>>>>>>> Reducer requires reading all corresponding shuffle data written
>>>>>>>> across all map tasks. Since each mapper only holds its own local 
>>>>>>>> output,
>>>>>>>> the consolidation cannot begin until all the map stage completes.
>>>>>>>>
>>>>>>>> However, your question is also aligned to one of the approaches
>>>>>>>> mentioned (concurrent consolidation
>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
>>>>>>>> which was specifically considered.
>>>>>>>> While the synchronous consolidation happens afetr all the data is
>>>>>>>> available , concurrent consolidation can aggregate and persist the
>>>>>>>> already-generated shuffle data to begin concurrently with the 
>>>>>>>> remaining map
>>>>>>>> tasks, thereby making the shuffle durable much earlier instead of 
>>>>>>>> having to
>>>>>>>> wait for all map tasks to complete.
>>>>>>>>
>>>>>>>> - Karuppayya
>>>>>>>>
>>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> another remark regarding a remote shuffle storage solution:
>>>>>>>>> As long as the map executors are alive, reduce executors should
>>>>>>>>> read from them to avoid any extra delay / overhead.
>>>>>>>>> On fetch failure from a map executor, the reduce executors should
>>>>>>>>> fall back to a remote storage that provides a copy (merged or not) of 
>>>>>>>>> the
>>>>>>>>> shuffle data.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Enrico
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
>>>>>>>>>
>>>>>>>>> Hi Karuppayya,
>>>>>>>>>
>>>>>>>>> thanks for your proposal and bringing up this issue.
>>>>>>>>>
>>>>>>>>> I am very much in favour of a shuffle storage solution that allows
>>>>>>>>> for dynamic allocation and node failure in a K8S environment, without 
>>>>>>>>> the
>>>>>>>>> burden of managing an Remote Shuffle Service.
>>>>>>>>>
>>>>>>>>> I have the following comments:
>>>>>>>>>
>>>>>>>>> Your proposed consolidation stage is equivalent to the next
>>>>>>>>> reducer stage in the sense that it reads shuffle data from the 
>>>>>>>>> earlier map
>>>>>>>>> stage. This requires the executors of the map stage to survive until 
>>>>>>>>> the
>>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology). 
>>>>>>>>> Therefore, I
>>>>>>>>> think this passage of your design document is not accurate:
>>>>>>>>>
>>>>>>>>>     Executors that perform the initial map tasks (shuffle writers)
>>>>>>>>> can be immediately deallocated after writing their shuffle data ...
>>>>>>>>>
>>>>>>>>> Since the consolidation stage reads all the shuffle data, why not
>>>>>>>>> doing the transformation in that stage? What is the point in 
>>>>>>>>> deferring the
>>>>>>>>> transformations into another stage?
>>>>>>>>>
>>>>>>>>> You mention the "Native Shuffle Block Migration" and say its
>>>>>>>>> limitation is "It simply shifts the storage burden to other active
>>>>>>>>> executors".
>>>>>>>>> Please consider that the migration process can migrate to a (in
>>>>>>>>> Spark called) fallback storage, which essentially copies the shuffle 
>>>>>>>>> data
>>>>>>>>> to a remote storage.
>>>>>>>>> Kind regards,
>>>>>>>>> Enrico
>>>>>>>>>
>>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
>>>>>>>>>
>>>>>>>>>  Hi All,
>>>>>>>>>
>>>>>>>>> I propose to utilize *Remote Storage as a Shuffle Store, natively
>>>>>>>>> in Spark* .
>>>>>>>>>
>>>>>>>>> This approach would fundamentally decouple shuffle storage from
>>>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help
>>>>>>>>> with aggressive downscaling*.
>>>>>>>>>
>>>>>>>>> The primary goal is to enhance the *elasticity and resilience* of
>>>>>>>>> Spark workloads, leading to substantial cost optimization 
>>>>>>>>> opportunities.
>>>>>>>>>
>>>>>>>>> *I welcome any initial thoughts or concerns regarding this idea.*
>>>>>>>>> *Looking forward to your feedback! *
>>>>>>>>>
>>>>>>>>> JIRA: SPARK-53484
>>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327>
>>>>>>>>> SPIP doc
>>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>
>>>>>>>>> ,
>>>>>>>>> Design doc
>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
>>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Karuppayya
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Regards
>>>>>>> Rishab Joshi
>>>>>>>
>>>>>>
>>

Reply via email to