Great feedback from many folks above, especially Wenchen's idea of AQE
integration.

Many folks here have participated in Spark shuffle discussions over many
years, why hasn't anybody proposed this before? What was the blind spot?
For me, I think I have focused too much on the disaggregation of shuffle
storage from compute and allocating as little local disk as possible on a
container, without seeing the huge benefit of not having a separate service
to manage.



On Fri, Dec 5, 2025 at 6:25 PM Chao Sun <[email protected]> wrote:

> I also feel the shuffle consolidation stage idea itself is interesting and
> can serve as an improvement on the existing shuffle mechanism in Spark.
> From that perspective, I'm supportive. On the other hand, the remote
> storage part seems largely orthogonal to this and can be built separately
> on top of it. Should we emphasize this more as opposed to the title "Remote
> Storage as a Shuffle Store"?
>
> Best,
> Chao
>
> On Fri, Dec 5, 2025 at 2:15 PM karuppayya <[email protected]>
> wrote:
>
>> Qiegang,
>> Thanks for the feedback.
>>
>> *Chao sent an email with the similar thought as yours (which I got to
>> know offline).* But somehow it seems to have been lost.
>> (Also the Apache ponymail
>> <https://lists.apache.org/thread/d49b3w8s4t3ob6kgyrtxkc7p6qbl3ssd>doesn't
>> have a few responses (e.g. my last response and Chao's email). Please let
>> me know who needs to be notified of this )
>>
>> *I completely agree with the suggestion to take an incremental approach
>> here.*
>>
>> *For large/skewed partitions,*
>>
>> While memory is bounded in my POC (via the ShuffleBlockFetchIterator
>> settings),  I agree that we need to think through the implications for
>> local storage, as it presents different challenges than remote
>> storage, with respect to disk.
>>
>> *For the remote storage phase, would it support streaming writes*
>>
>> I’ve ensured that writes are streamed
>> <https://github.com/apache/spark/pull/53028/files#diff-eb253db2e3342a7044aa974c0b9f51ac1c4a5d4e1eb390da8bf1351129f514c4R60>
>> in the POC PR. Please verify this when you get a chance to look at the code.
>>
>> *Having this native in Spark keeps things simpler*
>>
>> I also want to highlight why integrating this directly into Spark is
>> valuable.
>> Since many downstream systems(like Gluten/Velox) rely on the Spark
>> Physical Plan, exposing this information there would help them leverage
>> this capabilities automatically. This ensures out-of-the-box compatibility
>> and eliminates the need for ecosystem projects to implement any additional
>> logic for external systems.
>>
>> Mridul, Yang, Wenchen, Chao (and everyone),
>> I would appreciate your thoughts and feedback on this proposal. Your
>> inputs are critical.
>>
>> Thanks & Regards
>> Karuppayya
>>
>> On Fri, Dec 5, 2025 at 12:23 PM Qiegang Long <[email protected]> wrote:
>>
>>> Hi Karuppayya,
>>>
>>> Thanks for putting this together - I've been following the discussion on
>>> this interesting topic.
>>>
>>> I really like the simplicity of not needing an external shuffle service.
>>> Push-based shuffle works, but managing ESS adds operational complexity.
>>> Having this native in Spark keeps things simpler, and it would be an ops
>>> win.
>>>
>>> Wenchen raised an interesting point about AQE integration being a key
>>> differentiator. I agree that having Spark dynamically decide whether
>>> consolidation is worth it based on actual runtime stats could be really
>>> powerful.
>>>
>>> This got me thinking: what if we approach this incrementally?
>>>
>>> *Start with local consolidation + AQE* - prove out the consolidation
>>> stage concept and show it helps with IO efficiency. This is simpler to
>>> implement and we can measure the impact on real workloads without dealing
>>> with remote storage complexity yet.
>>>
>>> *Then add remote storage *- once we've proven the consolidation stage
>>> is beneficial, layer in the elasticity benefits of remote storage that your
>>> proposal focuses on.
>>>
>>> Wenchen mentioned the consolidation stage can be valuable even without
>>> remote storage, and that feels right to me. It would also help address
>>> Mridul and Yang's concerns about overlap with existing projects - if we can
>>> show the AQE-integrated consolidation has unique benefits first, it makes
>>> the case stronger.
>>>
>>> A few implementation questions I'm curious about:
>>>
>>> - For large/skewed partitions, could we split consolidation across
>>> multiple tasks? Like if partition X is 5GB, maybe 5 consolidators each
>>> handle 1GB worth of mapper outputs. Still way better than fetching from
>>> thousands of mappers, and it keeps disk/memory bounded per task.
>>> - For the remote storage phase, would it support streaming writes (S3
>>> multipart) to avoid holding huge merged partitions in memory/disk?
>>>
>>> Your proposal addresses a real problem - large shuffles with millions of
>>> small blocks are painful in production, but I think it would be more useful
>>> if it is integrated with AQE. The incremental approach might be a pragmatic
>>> way to deliver value sooner.
>>>
>>>
>>> On Wed, Dec 3, 2025 at 3:52 PM karuppayya <[email protected]>
>>> wrote:
>>>
>>>> Wenchen,
>>>> Thanks for being supportive of the idea.
>>>> I think I missed addressing some parts of your first email.
>>>>
>>>> *we can decrease the number of nodes that host active shuffle data, so
>>>>> that the cluster is more elastic.*
>>>>
>>>>
>>>> - We can write to local storage since we are using the Hadoop API.
>>>> - We could optimize task placement to run on hosts containing active
>>>> shuffle data.
>>>> - However, this poses a risk of disks filling up sooner( especially
>>>> with large shuffles), leading to task failures. This issue is amplified in
>>>> multi-user/session environments (e.g., Spark Connect) where unrelated jobs
>>>> might fill the disk, causing confusing diagnostic issues.
>>>> - *I propose offering this as an optional setting/config that
>>>> knowledgeable users can enable, particularly in environments with
>>>> sufficient local storage capacity.*
>>>>
>>>> *push-based shuffle framework to support other deployments (standalone,
>>>>> k8s) and remote storage?*
>>>>
>>>>
>>>> - Since the existing framework is YARN-dependent, retrofitting it for
>>>> other resource managers would require extensive logic for handling resource
>>>> management and shuffle service deployment differences.
>>>> - I prefer keeping the consolidation stage separate for a cleaner, more
>>>> generalized design.
>>>> *- However, if a significant architectural overlap becomes apparent,
>>>> then integration should certainly be reconsidered.*
>>>>
>>>>
>>>> Hi Enrico,
>>>> Thanks for the ideas.
>>>> I think double writes(either as a direct write or a copy after local
>>>> write), would
>>>> -  Increases overhead and adds more failure points.
>>>> - Executors holding onto shuffle data would prevent aggressive cluster
>>>> downscaling.
>>>> *While we have slightly lower latency, I think the delta between local
>>>> and remote reads is small compared to overall job time.*
>>>> For small jobs, it would be beneficial to bypass the shuffle
>>>> consolidation stage dynamically at runtime to avoid unnecessary overhead.
>>>>
>>>> * Is "shuffle consolidation" the preferred term?*
>>>>
>>>>
>>>> While the existing Spark term is "shuffle merge," which also involves
>>>> combining shuffle blocks, I am using "shuffle consolidation"  mainly for
>>>> disambiguation.
>>>> *- Shuffle merge *- combining few shuffle blocks
>>>> *- Shuffle consolidation* - merge all fragmented shuffle blocks for a
>>>> given reducer partition
>>>> *However, I don't have a strong opinion on which term is ultimately
>>>> used, as long as the function is clearly understood.*
>>>>
>>>> Regards
>>>> Karuppayya
>>>>
>>>>
>>>> On Wed, Dec 3, 2025 at 2:37 AM Enrico Minack <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Karuppayya,
>>>>>
>>>>> Thanks for the clarification.
>>>>>
>>>>> I would like to emphasize that a solution would be great that allows
>>>>> to prefer shuffle data from executors over remote storage:
>>>>> If the shuffle consolidation*) stage merges mapper outputs into
>>>>> reducer inputs and stores those on a remote storage, it could easily keep 
>>>>> a
>>>>> copy on the consolidating executors. For the lifetime of these executors,
>>>>> reducer executors could preferably fetch the consolidated shuffle data 
>>>>> from
>>>>> the consolidating executors, with the obvious benefits. Only for
>>>>> decommissioned consolidation executors or consolidation executors on 
>>>>> failed
>>>>> nodes, reducer executors fetch consolidated shuffle data from the remote
>>>>> storage.
>>>>>
>>>>> Further, splitting the proposed shuffle consolidation stage into:
>>>>> 1) consolidate shuffle data to executor local shuffle storage
>>>>> (AQE-based shuffle consolidation stage as proposed by Wenchen Fan)
>>>>> 2) replicate local shuffle storage to remote shuffle storage
>>>>> feels like a natural separation of concerns. And it allows for custom
>>>>> configuration to employ only the former without the latter or vice versa.
>>>>>
>>>>> Speaking of the ability to read shuffle data from executors during
>>>>> their existence while falling back to the remote storage replica reminds 
>>>>> of
>>>>> the existing Fallback Storage logic. Feature 2) could be implemented by
>>>>> evolving existing infrastructure (Fallback Storage) with only hundred 
>>>>> lines
>>>>> of code. The read-from-executors-first-then-remote-storage feature would
>>>>> cost a few more hundred lines of code.
>>>>>
>>>>> Cheers,
>>>>> Enrico
>>>>>
>>>>>
>>>>> *) Is "shuffle consolidation" the preferred term? Isn't the existing
>>>>> Spark term "shuffle merge" exactly what is being described here, maybe 
>>>>> with
>>>>> some small extension?
>>>>>
>>>>>
>>>>> Am 01.12.25 um 20:30 schrieb karuppayya:
>>>>>
>>>>> Hi everyone,
>>>>> Thank you all for your valuable comments and discussion on the design
>>>>> document/this email. I have replied to the comments/concerns raised.
>>>>> I welcome any other questions and to be challenged further.
>>>>>
>>>>> Also *Sun Chao* accepted to shepherd this proposal(Thank you!)
>>>>>
>>>>> If there are no other open questions by Wednesday morning (PST), I
>>>>> will request Chao to open the official voting thread (which should give
>>>>> 72 hours for the process
>>>>> <https://spark.apache.org/improvement-proposals.html>).
>>>>>
>>>>> - Karuppayya
>>>>>
>>>>> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> One aspect that hasn’t been mentioned yet (or so I think) is the
>>>>>> thread-level behavior of shuffle. In large workloads with many small
>>>>>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of
>>>>>> threads tied to shuffle fetch operations, Netty client handlers, and 
>>>>>> block
>>>>>> file access.
>>>>>> Since the proposal changes block granularity and fetch patterns, it
>>>>>> would be valuable to explicitly consider how the consolidation stage
>>>>>> affects:
>>>>>> – the number of concurrent fetch operations
>>>>>> – thread pool growth / saturation
>>>>>> – Netty transport threads
>>>>>> – memory pressure from large in-flight reads
>>>>>>
>>>>>> Btw, I find your proposal quite interesting.
>>>>>>
>>>>>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
>>>>>> escribió:
>>>>>>
>>>>>>> Rishab, Wenchen, Murali,
>>>>>>> Thank you very much for taking the time to review the proposal and
>>>>>>> for providing such thoughtful and insightful comments/questions.
>>>>>>>
>>>>>>> *Rishab*,
>>>>>>>
>>>>>>>> * suitable across all storage systems*
>>>>>>>
>>>>>>> You are right that the suitability is somewhat subjective and
>>>>>>> dependent on cloud provider used.
>>>>>>> In general, the goal of ShuffleVault is to utilize the standard
>>>>>>> Hadoop FileSystem APIs, which means it should work seamlessly with 
>>>>>>> popular
>>>>>>> cloud and distributed file systems (like S3, HDFS, GFS, etc.).
>>>>>>> These systems share a similar nature and are designed for large
>>>>>>> files.
>>>>>>>
>>>>>>> *large file could create problems during retrieval.*
>>>>>>>
>>>>>>> We are mitigating this risk by ensuring that  tasks do not read the
>>>>>>> entire consolidated file at once.
>>>>>>> Instead, the implementation is designed to read the data in
>>>>>>> configured blocks, rather than relying on a single read. *This
>>>>>>> behavior can be refined/validated* to make it more robust.
>>>>>>>
>>>>>>> *Wenchen,*
>>>>>>> I fully agree that the operational details around using cloud
>>>>>>> storage for shuffle—specifically traffic throttling, cleanup guarantees
>>>>>>> and overall request-related network cost — these are critical issues
>>>>>>> that must be solved.
>>>>>>> The consolidation stage is explicitly designed to mitigate the
>>>>>>> throttling and accompanying cost issues .
>>>>>>> *Throttling* - By consolidating shuffle data, this approach
>>>>>>> transforms the read pattern from a multitude of small, random requests 
>>>>>>> into
>>>>>>> fewer, large, targeted ones. Particularly beneficial for modern cloud
>>>>>>> object storage.
>>>>>>> *Shuffle cleanup* -  I am actively trying to leverage the Shufffle
>>>>>>> clean up mode and also making an effort to make them robust .These 
>>>>>>> cleanup
>>>>>>> improvements should be beneficial, regardless of this proposal and cover
>>>>>>> most cases.
>>>>>>> However, I agree that to ensure *no orphaned files* remain, we will
>>>>>>> still require other means (such as remote storage lifecycle policies or
>>>>>>> job-specific scripts) for a guaranteed cleanup.
>>>>>>> Thank you again for your valuable feedback, especially the
>>>>>>> validation on synchronous scheduling and AQE integration.
>>>>>>>
>>>>>>> *Murali,*
>>>>>>>
>>>>>>>> * Doing an explicit sort stage*
>>>>>>>
>>>>>>> To clarify, ShuffleVault does not introduce an explicit sort stage.
>>>>>>> Instead, it introduces a Shuffle Consolidation Stage.
>>>>>>> This stage is a pure passthrough operation that only aggregates
>>>>>>> scattered shuffle data for a given reducer partition.
>>>>>>> In simple terms, it functions as an additional reducer stage that
>>>>>>> reads the fragmented shuffle files from the mappers and writes them as a
>>>>>>> single, consolidated, durable file to remote storage.
>>>>>>>
>>>>>>> *but that would be a nontrivial change *
>>>>>>>
>>>>>>> I agree that the change is significant, I am  actively working to
>>>>>>> ensure the benefits are leveraged across the stack. This PR
>>>>>>> <https://github.com/apache/spark/pull/53028> demonstrates
>>>>>>> integration with AQE and interactions with other rules(Exchange reuse,
>>>>>>> Shuffle Partition Coalescing ect).
>>>>>>> I would genuinely appreciate it if you could take a look at the POC
>>>>>>> PR to see the scope00 of changes. The primary logic is encapsulated 
>>>>>>> within
>>>>>>> a new Spark Physical Planner Rule
>>>>>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
>>>>>>> that injects the consolidation stage, which is the main crux.
>>>>>>>
>>>>>>> I welcome any further questions or comments!
>>>>>>>
>>>>>>> Thanks & Regards
>>>>>>> Karuppayya
>>>>>>>
>>>>>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> There are existing Apache projects which provide the capabilities
>>>>>>>> which largely addresses the problem statement - Apache Celeborn, Apache
>>>>>>>> Uniffle, Zeus, etc.
>>>>>>>> Doing an explicit sort stage, between "map" and "reduce" brings
>>>>>>>> with it some nice advantages, especially if the output is durable, but 
>>>>>>>> that
>>>>>>>> would be a nontrivial change - and should be attempted if the benefits 
>>>>>>>> are
>>>>>>>> being leveraged throughout the stack (AQE, speculative execution, etc)
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Mridul
>>>>>>>>
>>>>>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Karuppayya,
>>>>>>>>>
>>>>>>>>> Handling large shuffles in Spark is challenging and it's great to
>>>>>>>>> see proposals addressing it. I think the extra "shuffle consolidation
>>>>>>>>> stage" is a good idea, and now I feel it's better for it to be 
>>>>>>>>> synchronous,
>>>>>>>>> so that we can integrate it with AQE and leverage the accurate runtime
>>>>>>>>> shuffle status to make decisions about whether or not to launch this 
>>>>>>>>> extra
>>>>>>>>> "shuffle consolidation stage" and how to consolidate. This is a key
>>>>>>>>> differentiator compared to the push-based shuffle.
>>>>>>>>>
>>>>>>>>> However, there are many details to consider, and in general it's
>>>>>>>>> difficult to use cloud storage for shuffle. We need to deal with 
>>>>>>>>> problems
>>>>>>>>> like traffic throttling, cleanup guarantee, cost control, and so on. 
>>>>>>>>> Let's
>>>>>>>>> take a step back and see what are the actual problems of large 
>>>>>>>>> shuffles.
>>>>>>>>>
>>>>>>>>> Large shuffle usually starts with a large number of mappers that
>>>>>>>>> we can't adjust (e.g. large table scan). We can adjust the number of
>>>>>>>>> reducers to reach two goals:
>>>>>>>>> 1. The input data size of each reducer shouldn't be too large,
>>>>>>>>> which is roughly *total_shuffle_size / num_reducers*. This is to
>>>>>>>>> avoid spilling/OOM during reducer task execution.
>>>>>>>>> 2. The data size of each shuffle block shoudn't be too small,
>>>>>>>>> which is roughly *total_shuffle_size / (num_mappers *
>>>>>>>>> num_reducers)*. This is for the good of disk/network IO.
>>>>>>>>>
>>>>>>>>> These two goals are actually contradictory and sometimes we have
>>>>>>>>> to prioritize goal 1 (i.e. pick a large *num_reducers*) so that
>>>>>>>>> the query can finish. An extra "shuffle consolidation stage" can kind 
>>>>>>>>> of
>>>>>>>>> decrease the number of mappers, by merging the shuffle files from 
>>>>>>>>> multiple
>>>>>>>>> mappers. This can be a clear win as fetching many small shuffle 
>>>>>>>>> blocks can
>>>>>>>>> be quite slow, even slower than running an extra "shuffle 
>>>>>>>>> consolidation
>>>>>>>>> stage".
>>>>>>>>>
>>>>>>>>> In addition, the nodes that host shuffle files shouldn't be too
>>>>>>>>> many (best to be 0 which means shuffle files are stored in a different
>>>>>>>>> storage). With a large number of mappers, likely every node in the 
>>>>>>>>> cluster
>>>>>>>>> stores some shuffle files. By merging shuffle files via the extra 
>>>>>>>>> "shuffle
>>>>>>>>> consolidation stage", we can decrease the number of nodes that host 
>>>>>>>>> active
>>>>>>>>> shuffle data, so that the cluster is more elastic.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Wenchen
>>>>>>>>>
>>>>>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Karuppayya,
>>>>>>>>>>
>>>>>>>>>> Thanks for sharing the proposal and this looks very exciting.
>>>>>>>>>>
>>>>>>>>>> I have a few questions and please correct me if I misunderstood
>>>>>>>>>> anything.
>>>>>>>>>>
>>>>>>>>>> Would it be possible to clarify whether the consolidated shuffle
>>>>>>>>>> file produced for each partition is suitable across all storage 
>>>>>>>>>> systems,
>>>>>>>>>> especially when this file becomes extremely large? I am wondering if 
>>>>>>>>>> a very
>>>>>>>>>> large file could create problems during retrieval. For example, if a
>>>>>>>>>> connection breaks while reading the file, some storage systems may 
>>>>>>>>>> not
>>>>>>>>>> support resuming reads from the point of failure and start reading 
>>>>>>>>>> the file
>>>>>>>>>> from the beginning again. This could lead to higher latency, repeated
>>>>>>>>>> retries, or performance bottlenecks when a partition becomes too 
>>>>>>>>>> large or
>>>>>>>>>> skewed?
>>>>>>>>>>
>>>>>>>>>> Would it make sense to introduce a configurable upper-bound on
>>>>>>>>>> the maximum allowed file size? This might prevent the file from 
>>>>>>>>>> growing
>>>>>>>>>> massively.
>>>>>>>>>> Should the consolidated shuffle file be compressed before being
>>>>>>>>>> written to the storage system. Compression might introduce additional
>>>>>>>>>> latency but that too can be a configurable option.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Rishab Joshi
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Enrico,
>>>>>>>>>>> Thank you very much for reviewing the doc.
>>>>>>>>>>>
>>>>>>>>>>> *Since the consolidation stage reads all the shuffle data, why
>>>>>>>>>>>> not doing the transformation in that stage? What is the point in 
>>>>>>>>>>>> deferring
>>>>>>>>>>>> the transformations into another stage?*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The reason for deferring the final consolidation to a subsequent
>>>>>>>>>>> stage lies in the distributed nature of shuffle data.
>>>>>>>>>>> Reducer requires reading all corresponding shuffle data written
>>>>>>>>>>> across all map tasks. Since each mapper only holds its own local 
>>>>>>>>>>> output,
>>>>>>>>>>> the consolidation cannot begin until all the map stage completes.
>>>>>>>>>>>
>>>>>>>>>>> However, your question is also aligned to one of the approaches
>>>>>>>>>>> mentioned (concurrent consolidation
>>>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
>>>>>>>>>>> which was specifically considered.
>>>>>>>>>>> While the synchronous consolidation happens afetr all the data
>>>>>>>>>>> is available , concurrent consolidation can aggregate and persist 
>>>>>>>>>>> the
>>>>>>>>>>> already-generated shuffle data to begin concurrently with the 
>>>>>>>>>>> remaining map
>>>>>>>>>>> tasks, thereby making the shuffle durable much earlier instead of 
>>>>>>>>>>> having to
>>>>>>>>>>> wait for all map tasks to complete.
>>>>>>>>>>>
>>>>>>>>>>> - Karuppayya
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> another remark regarding a remote shuffle storage solution:
>>>>>>>>>>>> As long as the map executors are alive, reduce executors should
>>>>>>>>>>>> read from them to avoid any extra delay / overhead.
>>>>>>>>>>>> On fetch failure from a map executor, the reduce executors
>>>>>>>>>>>> should fall back to a remote storage that provides a copy (merged 
>>>>>>>>>>>> or not)
>>>>>>>>>>>> of the shuffle data.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Enrico
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Karuppayya,
>>>>>>>>>>>>
>>>>>>>>>>>> thanks for your proposal and bringing up this issue.
>>>>>>>>>>>>
>>>>>>>>>>>> I am very much in favour of a shuffle storage solution that
>>>>>>>>>>>> allows for dynamic allocation and node failure in a K8S 
>>>>>>>>>>>> environment,
>>>>>>>>>>>> without the burden of managing an Remote Shuffle Service.
>>>>>>>>>>>>
>>>>>>>>>>>> I have the following comments:
>>>>>>>>>>>>
>>>>>>>>>>>> Your proposed consolidation stage is equivalent to the next
>>>>>>>>>>>> reducer stage in the sense that it reads shuffle data from the 
>>>>>>>>>>>> earlier map
>>>>>>>>>>>> stage. This requires the executors of the map stage to survive 
>>>>>>>>>>>> until the
>>>>>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology). 
>>>>>>>>>>>> Therefore, I
>>>>>>>>>>>> think this passage of your design document is not accurate:
>>>>>>>>>>>>
>>>>>>>>>>>>     Executors that perform the initial map tasks (shuffle
>>>>>>>>>>>> writers) can be immediately deallocated after writing their 
>>>>>>>>>>>> shuffle data ...
>>>>>>>>>>>>
>>>>>>>>>>>> Since the consolidation stage reads all the shuffle data, why
>>>>>>>>>>>> not doing the transformation in that stage? What is the point in 
>>>>>>>>>>>> deferring
>>>>>>>>>>>> the transformations into another stage?
>>>>>>>>>>>>
>>>>>>>>>>>> You mention the "Native Shuffle Block Migration" and say its
>>>>>>>>>>>> limitation is "It simply shifts the storage burden to other active
>>>>>>>>>>>> executors".
>>>>>>>>>>>> Please consider that the migration process can migrate to a (in
>>>>>>>>>>>> Spark called) fallback storage, which essentially copies the 
>>>>>>>>>>>> shuffle data
>>>>>>>>>>>> to a remote storage.
>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>> Enrico
>>>>>>>>>>>>
>>>>>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
>>>>>>>>>>>>
>>>>>>>>>>>>  Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I propose to utilize *Remote Storage as a Shuffle
>>>>>>>>>>>> Store, natively in Spark* .
>>>>>>>>>>>>
>>>>>>>>>>>> This approach would fundamentally decouple shuffle storage from
>>>>>>>>>>>> compute nodes, mitigating *shuffle fetch failures and also
>>>>>>>>>>>> help with aggressive downscaling*.
>>>>>>>>>>>>
>>>>>>>>>>>> The primary goal is to enhance the *elasticity and resilience*
>>>>>>>>>>>> of Spark workloads, leading to substantial cost optimization 
>>>>>>>>>>>> opportunities.
>>>>>>>>>>>>
>>>>>>>>>>>> *I welcome any initial thoughts or concerns regarding this
>>>>>>>>>>>> idea.*
>>>>>>>>>>>> *Looking forward to your feedback! *
>>>>>>>>>>>>
>>>>>>>>>>>> JIRA: SPARK-53484
>>>>>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327>
>>>>>>>>>>>> SPIP doc
>>>>>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>
>>>>>>>>>>>> ,
>>>>>>>>>>>> Design doc
>>>>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
>>>>>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Karuppayya
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Regards
>>>>>>>>>> Rishab Joshi
>>>>>>>>>>
>>>>>>>>>
>>>>>

-- 
John Zhuge

Reply via email to