Qiegang, Thanks for the feedback. *Chao sent an email with the similar thought as yours (which I got to know offline).* But somehow it seems to have been lost. (Also the Apache ponymail <https://lists.apache.org/thread/d49b3w8s4t3ob6kgyrtxkc7p6qbl3ssd>doesn't have a few responses (e.g. my last response and Chao's email). Please let me know who needs to be notified of this )
*I completely agree with the suggestion to take an incremental approach here.* *For large/skewed partitions,* While memory is bounded in my POC (via the ShuffleBlockFetchIterator settings), I agree that we need to think through the implications for local storage, as it presents different challenges than remote storage, with respect to disk. *For the remote storage phase, would it support streaming writes* I’ve ensured that writes are streamed <https://github.com/apache/spark/pull/53028/files#diff-eb253db2e3342a7044aa974c0b9f51ac1c4a5d4e1eb390da8bf1351129f514c4R60> in the POC PR. Please verify this when you get a chance to look at the code. *Having this native in Spark keeps things simpler* I also want to highlight why integrating this directly into Spark is valuable. Since many downstream systems(like Gluten/Velox) rely on the Spark Physical Plan, exposing this information there would help them leverage this capabilities automatically. This ensures out-of-the-box compatibility and eliminates the need for ecosystem projects to implement any additional logic for external systems. Mridul, Yang, Wenchen, Chao (and everyone), I would appreciate your thoughts and feedback on this proposal. Your inputs are critical. Thanks & Regards Karuppayya On Fri, Dec 5, 2025 at 12:23 PM Qiegang Long <[email protected]> wrote: > Hi Karuppayya, > > Thanks for putting this together - I've been following the discussion on > this interesting topic. > > I really like the simplicity of not needing an external shuffle service. > Push-based shuffle works, but managing ESS adds operational complexity. > Having this native in Spark keeps things simpler, and it would be an ops > win. > > Wenchen raised an interesting point about AQE integration being a key > differentiator. I agree that having Spark dynamically decide whether > consolidation is worth it based on actual runtime stats could be really > powerful. > > This got me thinking: what if we approach this incrementally? > > *Start with local consolidation + AQE* - prove out the consolidation > stage concept and show it helps with IO efficiency. This is simpler to > implement and we can measure the impact on real workloads without dealing > with remote storage complexity yet. > > *Then add remote storage *- once we've proven the consolidation stage is > beneficial, layer in the elasticity benefits of remote storage that your > proposal focuses on. > > Wenchen mentioned the consolidation stage can be valuable even without > remote storage, and that feels right to me. It would also help address > Mridul and Yang's concerns about overlap with existing projects - if we can > show the AQE-integrated consolidation has unique benefits first, it makes > the case stronger. > > A few implementation questions I'm curious about: > > - For large/skewed partitions, could we split consolidation across > multiple tasks? Like if partition X is 5GB, maybe 5 consolidators each > handle 1GB worth of mapper outputs. Still way better than fetching from > thousands of mappers, and it keeps disk/memory bounded per task. > - For the remote storage phase, would it support streaming writes (S3 > multipart) to avoid holding huge merged partitions in memory/disk? > > Your proposal addresses a real problem - large shuffles with millions of > small blocks are painful in production, but I think it would be more useful > if it is integrated with AQE. The incremental approach might be a pragmatic > way to deliver value sooner. > > > On Wed, Dec 3, 2025 at 3:52 PM karuppayya <[email protected]> > wrote: > >> Wenchen, >> Thanks for being supportive of the idea. >> I think I missed addressing some parts of your first email. >> >> *we can decrease the number of nodes that host active shuffle data, so >>> that the cluster is more elastic.* >> >> >> - We can write to local storage since we are using the Hadoop API. >> - We could optimize task placement to run on hosts containing active >> shuffle data. >> - However, this poses a risk of disks filling up sooner( especially with >> large shuffles), leading to task failures. This issue is amplified in >> multi-user/session environments (e.g., Spark Connect) where unrelated jobs >> might fill the disk, causing confusing diagnostic issues. >> - *I propose offering this as an optional setting/config that >> knowledgeable users can enable, particularly in environments with >> sufficient local storage capacity.* >> >> *push-based shuffle framework to support other deployments (standalone, >>> k8s) and remote storage?* >> >> >> - Since the existing framework is YARN-dependent, retrofitting it for >> other resource managers would require extensive logic for handling resource >> management and shuffle service deployment differences. >> - I prefer keeping the consolidation stage separate for a cleaner, more >> generalized design. >> *- However, if a significant architectural overlap becomes apparent, then >> integration should certainly be reconsidered.* >> >> >> Hi Enrico, >> Thanks for the ideas. >> I think double writes(either as a direct write or a copy after local >> write), would >> - Increases overhead and adds more failure points. >> - Executors holding onto shuffle data would prevent aggressive cluster >> downscaling. >> *While we have slightly lower latency, I think the delta between local >> and remote reads is small compared to overall job time.* >> For small jobs, it would be beneficial to bypass the shuffle >> consolidation stage dynamically at runtime to avoid unnecessary overhead. >> >> * Is "shuffle consolidation" the preferred term?* >> >> >> While the existing Spark term is "shuffle merge," which also involves >> combining shuffle blocks, I am using "shuffle consolidation" mainly for >> disambiguation. >> *- Shuffle merge *- combining few shuffle blocks >> *- Shuffle consolidation* - merge all fragmented shuffle blocks for a >> given reducer partition >> *However, I don't have a strong opinion on which term is ultimately used, >> as long as the function is clearly understood.* >> >> Regards >> Karuppayya >> >> >> On Wed, Dec 3, 2025 at 2:37 AM Enrico Minack <[email protected]> >> wrote: >> >>> Hi Karuppayya, >>> >>> Thanks for the clarification. >>> >>> I would like to emphasize that a solution would be great that allows to >>> prefer shuffle data from executors over remote storage: >>> If the shuffle consolidation*) stage merges mapper outputs into reducer >>> inputs and stores those on a remote storage, it could easily keep a copy on >>> the consolidating executors. For the lifetime of these executors, reducer >>> executors could preferably fetch the consolidated shuffle data from the >>> consolidating executors, with the obvious benefits. Only for decommissioned >>> consolidation executors or consolidation executors on failed nodes, reducer >>> executors fetch consolidated shuffle data from the remote storage. >>> >>> Further, splitting the proposed shuffle consolidation stage into: >>> 1) consolidate shuffle data to executor local shuffle storage (AQE-based >>> shuffle consolidation stage as proposed by Wenchen Fan) >>> 2) replicate local shuffle storage to remote shuffle storage >>> feels like a natural separation of concerns. And it allows for custom >>> configuration to employ only the former without the latter or vice versa. >>> >>> Speaking of the ability to read shuffle data from executors during their >>> existence while falling back to the remote storage replica reminds of the >>> existing Fallback Storage logic. Feature 2) could be implemented by >>> evolving existing infrastructure (Fallback Storage) with only hundred lines >>> of code. The read-from-executors-first-then-remote-storage feature would >>> cost a few more hundred lines of code. >>> >>> Cheers, >>> Enrico >>> >>> >>> *) Is "shuffle consolidation" the preferred term? Isn't the existing >>> Spark term "shuffle merge" exactly what is being described here, maybe with >>> some small extension? >>> >>> >>> Am 01.12.25 um 20:30 schrieb karuppayya: >>> >>> Hi everyone, >>> Thank you all for your valuable comments and discussion on the design >>> document/this email. I have replied to the comments/concerns raised. >>> I welcome any other questions and to be challenged further. >>> >>> Also *Sun Chao* accepted to shepherd this proposal(Thank you!) >>> >>> If there are no other open questions by Wednesday morning (PST), I will >>> request Chao to open the official voting thread (which should give 72 >>> hours for the process >>> <https://spark.apache.org/improvement-proposals.html>). >>> >>> - Karuppayya >>> >>> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua < >>> [email protected]> wrote: >>> >>>> One aspect that hasn’t been mentioned yet (or so I think) is the >>>> thread-level behavior of shuffle. In large workloads with many small >>>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of >>>> threads tied to shuffle fetch operations, Netty client handlers, and block >>>> file access. >>>> Since the proposal changes block granularity and fetch patterns, it >>>> would be valuable to explicitly consider how the consolidation stage >>>> affects: >>>> – the number of concurrent fetch operations >>>> – thread pool growth / saturation >>>> – Netty transport threads >>>> – memory pressure from large in-flight reads >>>> >>>> Btw, I find your proposal quite interesting. >>>> >>>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]> >>>> escribió: >>>> >>>>> Rishab, Wenchen, Murali, >>>>> Thank you very much for taking the time to review the proposal and for >>>>> providing such thoughtful and insightful comments/questions. >>>>> >>>>> *Rishab*, >>>>> >>>>>> * suitable across all storage systems* >>>>> >>>>> You are right that the suitability is somewhat subjective and >>>>> dependent on cloud provider used. >>>>> In general, the goal of ShuffleVault is to utilize the standard Hadoop >>>>> FileSystem APIs, which means it should work seamlessly with popular cloud >>>>> and distributed file systems (like S3, HDFS, GFS, etc.). >>>>> These systems share a similar nature and are designed for large files. >>>>> >>>>> *large file could create problems during retrieval.* >>>>> >>>>> We are mitigating this risk by ensuring that tasks do not read the >>>>> entire consolidated file at once. >>>>> Instead, the implementation is designed to read the data in configured >>>>> blocks, rather than relying on a single read. *This behavior can be >>>>> refined/validated* to make it more robust. >>>>> >>>>> *Wenchen,* >>>>> I fully agree that the operational details around using cloud storage >>>>> for shuffle—specifically traffic throttling, cleanup guarantees >>>>> and overall request-related network cost — these are critical issues >>>>> that must be solved. >>>>> The consolidation stage is explicitly designed to mitigate the >>>>> throttling and accompanying cost issues . >>>>> *Throttling* - By consolidating shuffle data, this approach >>>>> transforms the read pattern from a multitude of small, random requests >>>>> into >>>>> fewer, large, targeted ones. Particularly beneficial for modern cloud >>>>> object storage. >>>>> *Shuffle cleanup* - I am actively trying to leverage the Shufffle >>>>> clean up mode and also making an effort to make them robust .These cleanup >>>>> improvements should be beneficial, regardless of this proposal and cover >>>>> most cases. >>>>> However, I agree that to ensure *no orphaned files* remain, we will >>>>> still require other means (such as remote storage lifecycle policies or >>>>> job-specific scripts) for a guaranteed cleanup. >>>>> Thank you again for your valuable feedback, especially the validation >>>>> on synchronous scheduling and AQE integration. >>>>> >>>>> *Murali,* >>>>> >>>>>> * Doing an explicit sort stage* >>>>> >>>>> To clarify, ShuffleVault does not introduce an explicit sort stage. >>>>> Instead, it introduces a Shuffle Consolidation Stage. >>>>> This stage is a pure passthrough operation that only aggregates >>>>> scattered shuffle data for a given reducer partition. >>>>> In simple terms, it functions as an additional reducer stage that >>>>> reads the fragmented shuffle files from the mappers and writes them as a >>>>> single, consolidated, durable file to remote storage. >>>>> >>>>> *but that would be a nontrivial change * >>>>> >>>>> I agree that the change is significant, I am actively working to >>>>> ensure the benefits are leveraged across the stack. This PR >>>>> <https://github.com/apache/spark/pull/53028> demonstrates integration >>>>> with AQE and interactions with other rules(Exchange reuse, Shuffle >>>>> Partition Coalescing ect). >>>>> I would genuinely appreciate it if you could take a look at the POC PR >>>>> to see the scope00 of changes. The primary logic is encapsulated within a >>>>> new Spark Physical Planner Rule >>>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6> >>>>> that injects the consolidation stage, which is the main crux. >>>>> >>>>> I welcome any further questions or comments! >>>>> >>>>> Thanks & Regards >>>>> Karuppayya >>>>> >>>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <[email protected]> >>>>> wrote: >>>>> >>>>>> >>>>>> There are existing Apache projects which provide the capabilities >>>>>> which largely addresses the problem statement - Apache Celeborn, Apache >>>>>> Uniffle, Zeus, etc. >>>>>> Doing an explicit sort stage, between "map" and "reduce" brings with >>>>>> it some nice advantages, especially if the output is durable, but that >>>>>> would be a nontrivial change - and should be attempted if the benefits >>>>>> are >>>>>> being leveraged throughout the stack (AQE, speculative execution, etc) >>>>>> >>>>>> Regards, >>>>>> Mridul >>>>>> >>>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Karuppayya, >>>>>>> >>>>>>> Handling large shuffles in Spark is challenging and it's great to >>>>>>> see proposals addressing it. I think the extra "shuffle consolidation >>>>>>> stage" is a good idea, and now I feel it's better for it to be >>>>>>> synchronous, >>>>>>> so that we can integrate it with AQE and leverage the accurate runtime >>>>>>> shuffle status to make decisions about whether or not to launch this >>>>>>> extra >>>>>>> "shuffle consolidation stage" and how to consolidate. This is a key >>>>>>> differentiator compared to the push-based shuffle. >>>>>>> >>>>>>> However, there are many details to consider, and in general it's >>>>>>> difficult to use cloud storage for shuffle. We need to deal with >>>>>>> problems >>>>>>> like traffic throttling, cleanup guarantee, cost control, and so on. >>>>>>> Let's >>>>>>> take a step back and see what are the actual problems of large shuffles. >>>>>>> >>>>>>> Large shuffle usually starts with a large number of mappers that we >>>>>>> can't adjust (e.g. large table scan). We can adjust the number of >>>>>>> reducers >>>>>>> to reach two goals: >>>>>>> 1. The input data size of each reducer shouldn't be too large, which >>>>>>> is roughly *total_shuffle_size / num_reducers*. This is to avoid >>>>>>> spilling/OOM during reducer task execution. >>>>>>> 2. The data size of each shuffle block shoudn't be too small, which >>>>>>> is roughly *total_shuffle_size / (num_mappers * num_reducers)*. >>>>>>> This is for the good of disk/network IO. >>>>>>> >>>>>>> These two goals are actually contradictory and sometimes we have to >>>>>>> prioritize goal 1 (i.e. pick a large *num_reducers*) so that the >>>>>>> query can finish. An extra "shuffle consolidation stage" can kind of >>>>>>> decrease the number of mappers, by merging the shuffle files from >>>>>>> multiple >>>>>>> mappers. This can be a clear win as fetching many small shuffle blocks >>>>>>> can >>>>>>> be quite slow, even slower than running an extra "shuffle consolidation >>>>>>> stage". >>>>>>> >>>>>>> In addition, the nodes that host shuffle files shouldn't be too many >>>>>>> (best to be 0 which means shuffle files are stored in a different >>>>>>> storage). >>>>>>> With a large number of mappers, likely every node in the cluster stores >>>>>>> some shuffle files. By merging shuffle files via the extra "shuffle >>>>>>> consolidation stage", we can decrease the number of nodes that host >>>>>>> active >>>>>>> shuffle data, so that the cluster is more elastic. >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Wenchen >>>>>>> >>>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Karuppayya, >>>>>>>> >>>>>>>> Thanks for sharing the proposal and this looks very exciting. >>>>>>>> >>>>>>>> I have a few questions and please correct me if I misunderstood >>>>>>>> anything. >>>>>>>> >>>>>>>> Would it be possible to clarify whether the consolidated shuffle >>>>>>>> file produced for each partition is suitable across all storage >>>>>>>> systems, >>>>>>>> especially when this file becomes extremely large? I am wondering if a >>>>>>>> very >>>>>>>> large file could create problems during retrieval. For example, if a >>>>>>>> connection breaks while reading the file, some storage systems may not >>>>>>>> support resuming reads from the point of failure and start reading the >>>>>>>> file >>>>>>>> from the beginning again. This could lead to higher latency, repeated >>>>>>>> retries, or performance bottlenecks when a partition becomes too large >>>>>>>> or >>>>>>>> skewed? >>>>>>>> >>>>>>>> Would it make sense to introduce a configurable upper-bound on the >>>>>>>> maximum allowed file size? This might prevent the file from growing >>>>>>>> massively. >>>>>>>> Should the consolidated shuffle file be compressed before being >>>>>>>> written to the storage system. Compression might introduce additional >>>>>>>> latency but that too can be a configurable option. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Rishab Joshi >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Enrico, >>>>>>>>> Thank you very much for reviewing the doc. >>>>>>>>> >>>>>>>>> *Since the consolidation stage reads all the shuffle data, why not >>>>>>>>>> doing the transformation in that stage? What is the point in >>>>>>>>>> deferring the >>>>>>>>>> transformations into another stage?* >>>>>>>>> >>>>>>>>> >>>>>>>>> The reason for deferring the final consolidation to a subsequent >>>>>>>>> stage lies in the distributed nature of shuffle data. >>>>>>>>> Reducer requires reading all corresponding shuffle data written >>>>>>>>> across all map tasks. Since each mapper only holds its own local >>>>>>>>> output, >>>>>>>>> the consolidation cannot begin until all the map stage completes. >>>>>>>>> >>>>>>>>> However, your question is also aligned to one of the approaches >>>>>>>>> mentioned (concurrent consolidation >>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>), >>>>>>>>> which was specifically considered. >>>>>>>>> While the synchronous consolidation happens afetr all the data is >>>>>>>>> available , concurrent consolidation can aggregate and persist the >>>>>>>>> already-generated shuffle data to begin concurrently with the >>>>>>>>> remaining map >>>>>>>>> tasks, thereby making the shuffle durable much earlier instead of >>>>>>>>> having to >>>>>>>>> wait for all map tasks to complete. >>>>>>>>> >>>>>>>>> - Karuppayya >>>>>>>>> >>>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> another remark regarding a remote shuffle storage solution: >>>>>>>>>> As long as the map executors are alive, reduce executors should >>>>>>>>>> read from them to avoid any extra delay / overhead. >>>>>>>>>> On fetch failure from a map executor, the reduce executors should >>>>>>>>>> fall back to a remote storage that provides a copy (merged or not) >>>>>>>>>> of the >>>>>>>>>> shuffle data. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Enrico >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack: >>>>>>>>>> >>>>>>>>>> Hi Karuppayya, >>>>>>>>>> >>>>>>>>>> thanks for your proposal and bringing up this issue. >>>>>>>>>> >>>>>>>>>> I am very much in favour of a shuffle storage solution that >>>>>>>>>> allows for dynamic allocation and node failure in a K8S environment, >>>>>>>>>> without the burden of managing an Remote Shuffle Service. >>>>>>>>>> >>>>>>>>>> I have the following comments: >>>>>>>>>> >>>>>>>>>> Your proposed consolidation stage is equivalent to the next >>>>>>>>>> reducer stage in the sense that it reads shuffle data from the >>>>>>>>>> earlier map >>>>>>>>>> stage. This requires the executors of the map stage to survive until >>>>>>>>>> the >>>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology). >>>>>>>>>> Therefore, I >>>>>>>>>> think this passage of your design document is not accurate: >>>>>>>>>> >>>>>>>>>> Executors that perform the initial map tasks (shuffle >>>>>>>>>> writers) can be immediately deallocated after writing their shuffle >>>>>>>>>> data ... >>>>>>>>>> >>>>>>>>>> Since the consolidation stage reads all the shuffle data, why not >>>>>>>>>> doing the transformation in that stage? What is the point in >>>>>>>>>> deferring the >>>>>>>>>> transformations into another stage? >>>>>>>>>> >>>>>>>>>> You mention the "Native Shuffle Block Migration" and say its >>>>>>>>>> limitation is "It simply shifts the storage burden to other active >>>>>>>>>> executors". >>>>>>>>>> Please consider that the migration process can migrate to a (in >>>>>>>>>> Spark called) fallback storage, which essentially copies the shuffle >>>>>>>>>> data >>>>>>>>>> to a remote storage. >>>>>>>>>> Kind regards, >>>>>>>>>> Enrico >>>>>>>>>> >>>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya: >>>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I propose to utilize *Remote Storage as a Shuffle >>>>>>>>>> Store, natively in Spark* . >>>>>>>>>> >>>>>>>>>> This approach would fundamentally decouple shuffle storage from >>>>>>>>>> compute nodes, mitigating *shuffle fetch failures and also help >>>>>>>>>> with aggressive downscaling*. >>>>>>>>>> >>>>>>>>>> The primary goal is to enhance the *elasticity and resilience* >>>>>>>>>> of Spark workloads, leading to substantial cost optimization >>>>>>>>>> opportunities. >>>>>>>>>> >>>>>>>>>> *I welcome any initial thoughts or concerns regarding this idea.* >>>>>>>>>> *Looking forward to your feedback! * >>>>>>>>>> >>>>>>>>>> JIRA: SPARK-53484 >>>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327> >>>>>>>>>> SPIP doc >>>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw> >>>>>>>>>> , >>>>>>>>>> Design doc >>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0> >>>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028> >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Karuppayya >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Regards >>>>>>>> Rishab Joshi >>>>>>>> >>>>>>> >>>
