Hi Karuppayya,

Thanks for the clarification.

I would like to emphasize that a solution would be great that allows to prefer shuffle data from executors over remote storage: If the shuffle consolidation^*) stage merges mapper outputs into reducer inputs and stores those on a remote storage, it could easily keep a copy on the consolidating executors. For the lifetime of these executors, reducer executors could preferably fetch the consolidated shuffle data from the consolidating executors, with the obvious benefits. Only for decommissioned consolidation executors or consolidation executors on failed nodes, reducer executors fetch consolidated shuffle data from the remote storage.

Further, splitting the proposed shuffle consolidation stage into:
1) consolidate shuffle data to executor local shuffle storage (AQE-based shuffle consolidation stage as proposed by Wenchen Fan)
2) replicate local shuffle storage to remote shuffle storage
feels like a natural separation of concerns. And it allows for custom configuration to employ only the former without the latter or vice versa.

Speaking of the ability to read shuffle data from executors during their existence while falling back to the remote storage replica reminds of the existing Fallback Storage logic. Feature 2) could be implemented by evolving existing infrastructure (Fallback Storage) with only hundred lines of code. The read-from-executors-first-then-remote-storage feature would cost a few more hundred lines of code.

Cheers,
Enrico


^*) Is "shuffle consolidation" the preferred term? Isn't the existing Spark term "shuffle merge" exactly what is being described here, maybe with some small extension?


Am 01.12.25 um 20:30 schrieb karuppayya:
Hi everyone,
Thank you all for your valuable comments and discussion on the design document/this email. I have replied to the comments/concerns raised.
I welcome any other questions and to be challenged further.

Also *Sun Chao* accepted to shepherd this proposal(Thank you!)

If there are no other open questions by Wednesday morning (PST), I will request Chao to open the official voting thread (which should give 72 hours for the process <https://spark.apache.org/improvement-proposals.html>).

- Karuppayya

On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <[email protected]> wrote:

    One aspect that hasn’t been mentioned yet (or so I think) is the
    thread-level behavior of shuffle. In large workloads with many
    small shuffle blocks, I’ve repeatedly observed executors spawning
    hundreds of threads tied to shuffle fetch operations, Netty client
    handlers, and block file access.
    Since the proposal changes block granularity and fetch patterns,
    it would be valuable to explicitly consider how the consolidation
    stage affects:
    – the number of concurrent fetch operations
    – thread pool growth / saturation
    – Netty transport threads
    – memory pressure from large in-flight reads

    Btw, I find your proposal quite interesting.

    El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
    escribió:

        Rishab, Wenchen, Murali,
        Thank you very much for taking the time to review the proposal
        and for providing such thoughtful and insightful
        comments/questions.

        *Rishab*,

            /* suitable across all storage systems*/

        You are right that the suitability is somewhat subjective and
        dependent on cloud provider used.
        In general, the goal of ShuffleVault is to utilize the
        standard Hadoop FileSystem APIs, which means it should work
        seamlessly with popular cloud and distributed file systems
        (like S3, HDFS, GFS, etc.).
        These systems share a similar nature and are designed for
        large files.

            /*large file could create problems during retrieval.*/

        We are mitigating this risk by ensuring that  tasks do not
        read the entire consolidated file at once.
        Instead, the implementation is designed to read the data in
        configured blocks, rather than relying on a single read. *This
        behavior can be refined/validated* to make it more robust.

        *Wenchen,*
        I fully agree that the operational details around using cloud
        storage for shuffle—specifically traffic throttling, cleanup
        guarantees
        and overall request-related network cost — these are critical
        issues that must be solved.
        The consolidation stage is explicitly designed to mitigate the
        throttling and accompanying cost issues .
        *Throttling* - By consolidating shuffle data, this approach
        transforms the read pattern from a multitude of small, random
        requests into fewer, large, targeted ones. Particularly
        beneficial for modern cloud object storage.
        *Shuffle cleanup* -  I am actively trying to leverage the
        Shufffle clean up mode and also making an effort to make them
        robust .These cleanup improvements should be beneficial,
        regardless of this proposal and cover most cases.
        However, I agree that to ensure *no orphaned files* remain, we
        will still require other means (such as remote storage
        lifecycle policies or job-specific scripts) for a guaranteed
        cleanup.
        Thank you again for your valuable feedback, especially the
        validation on synchronous scheduling and AQE integration.

        *Murali,*

            */ Doing an explicit sort stage/*

        To clarify, ShuffleVault does not introduce an explicit sort
        stage. Instead, it introduces a Shuffle Consolidation Stage.
        This stage is a pure passthrough operation that only
        aggregates scattered shuffle data for a given reducer partition.
        In simple terms, it functions as an additional reducer stage
        that reads the fragmented shuffle files from the mappers and
        writes them as a single, consolidated, durable file to remote
        storage.

            *but that would be a nontrivial change *

        I agree that the change is significant, I am  actively working
        to ensure the benefits are leveraged across the stack. This PR
        <https://github.com/apache/spark/pull/53028> demonstrates
        integration with AQE and interactions with other
        rules(Exchange reuse, Shuffle Partition Coalescing ect).
        I would genuinely appreciate it if you could take a look at
        the POC PR to see the scope00 of changes. The primary logic is
        encapsulated within a new Spark Physical Planner Rule
        
<https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
        that injects the consolidation stage, which is the main crux.

        I welcome any further questions or comments!

        Thanks & Regards
        Karuppayya

        On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan
        <[email protected]> wrote:


            There are existing Apache projects which provide the
            capabilities which largely addresses the problem statement
            - Apache Celeborn, Apache Uniffle, Zeus, etc.
            Doing an explicit sort stage, between "map" and "reduce"
            brings with it some nice advantages, especially if the
            output is durable, but that would be a nontrivial change -
            and should be attempted if the benefits are being
            leveraged throughout the stack (AQE, speculative
            execution, etc)

            Regards,
            Mridul

            On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan
            <[email protected]> wrote:

                Hi Karuppayya,

                Handling large shuffles in Spark is challenging and
                it's great to see proposals addressing it. I think the
                extra "shuffle consolidation stage" is a good idea,
                and now I feel it's better for it to be synchronous,
                so that we can integrate it with AQE and leverage the
                accurate runtime shuffle status to make decisions
                about whether or not to launch this extra "shuffle
                consolidation stage" and how to consolidate. This is a
                key differentiator compared to the push-based shuffle.

                However, there are many details to consider, and in
                general it's difficult to use cloud storage for
                shuffle. We need to deal with problems like traffic
                throttling, cleanup guarantee, cost control, and so
                on. Let's take a step back and see what are the actual
                problems of large shuffles.

                Large shuffle usually starts with a large number of
                mappers that we can't adjust (e.g. large table scan).
                We can adjust the number of reducers to reach two goals:
                1. The input data size of each reducer shouldn't be
                too large, which is roughly *total_shuffle_size /
                num_reducers*. This is to avoid spilling/OOM during
                reducer task execution.
                2. The data size of each shuffle block shoudn't be too
                small, which is roughly *total_shuffle_size /
                (num_mappers * num_reducers)*. This is for the good of
                disk/network IO.

                These two goals are actually contradictory and
                sometimes we have to prioritize goal 1 (i.e. pick a
                large *num_reducers*) so that the query can finish. An
                extra "shuffle consolidation stage" can kind of
                decrease the number of mappers, by merging the shuffle
                files from multiple mappers. This can be a clear win
                as fetching many small shuffle blocks can be quite
                slow, even slower than running an extra "shuffle
                consolidation stage".

                In addition, the nodes that host shuffle files
                shouldn't be too many (best to be 0 which means
                shuffle files are stored in a different storage). With
                a large number of mappers, likely every node in the
                cluster stores some shuffle files. By merging shuffle
                files via the extra "shuffle consolidation stage", we
                can decrease the number of nodes that host active
                shuffle data, so that the cluster is more elastic.


                Thanks,
                Wenchen

                On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi
                <[email protected]> wrote:

                    Hi Karuppayya,

                    Thanks for sharing the proposal and this looks
                    very exciting.

                    I have a few questions and please correct me if I
                    misunderstood anything.

                    Would it be possible to clarify whether the
                    consolidated shuffle file produced for each
                    partition is suitable across all storage systems,
                    especially when this file becomes extremely large?
                    I am wondering if a very large file could create
                    problems during retrieval. For example, if a
                    connection breaks while reading the file, some
                    storage systems may not support resuming reads
                    from the point of failure and start reading the
                    file from the beginning again. This could lead to
                    higher latency, repeated retries, or performance
                    bottlenecks when a partition becomes too large or
                    skewed?

                    Would it make sense to introduce a configurable
                    upper-bound on the maximum allowed file size? This
                    might prevent the file from growing massively.
                    Should the consolidated shuffle file be compressed
                    before being written to the storage system.
                    Compression might introduce additional latency but
                    that too can be a configurable option.

                    Regards,
                    Rishab Joshi





                    On Thu, Nov 13, 2025 at 9:14 AM karuppayya
                    <[email protected]> wrote:

                        Enrico,
                        Thank you very much for reviewing the doc.

                            /Since the consolidation stage reads all
                            the shuffle data, why not doing the
                            transformation in that stage? What is the
                            point in deferring the transformations
                            into another stage?/


                        The reason for deferring the final
                        consolidation to a subsequent stage lies in
                        the distributed nature of shuffle data.
                        Reducer requires reading all corresponding
                        shuffle data written across all map tasks.
                        Since each mapper only holds its own local
                        output, the consolidation cannot begin until
                        all the map stage completes.

                        However, your question is also aligned to one
                        of the approaches mentioned (concurrent
                        consolidation
                        
<https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
                        which was specifically considered.

                        While the synchronous consolidation happens
                        afetr all the data is available , concurrent
                        consolidation can aggregate and persist the
                        already-generated shuffle data to begin
                        concurrently with the remaining map tasks,
                        thereby making the shuffle durable much
                        earlier instead of having to wait for all map
                        tasks to complete.

                        - Karuppayya

                        On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack
                        <[email protected]> wrote:

                            Hi,

                            another remark regarding a remote shuffle
                            storage solution:
                            As long as the map executors are alive,
                            reduce executors should read from them to
                            avoid any extra delay / overhead.
                            On fetch failure from a map executor, the
                            reduce executors should fall back to a
                            remote storage that provides a copy
                            (merged or not) of the shuffle data.

                            Cheers,
                            Enrico


                            Am 13.11.25 um 09:42 schrieb Enrico Minack:

                            Hi Karuppayya,

                            thanks for your proposal and bringing up
                            this issue.

                            I am very much in favour of a shuffle
                            storage solution that allows for dynamic
                            allocation and node failure in a K8S
                            environment, without the burden of
                            managing an Remote Shuffle Service.

                            I have the following comments:

                            Your proposed consolidation stage is
                            equivalent to the next reducer stage in
                            the sense that it reads shuffle data from
                            the earlier map stage. This requires the
                            executors of the map stage to survive
                            until the shuffle data are consolidated
                            ("merged" in Spark terminology).
                            Therefore, I think this passage of your
                            design document is not accurate:

                                Executors that perform the initial
                            map tasks (shuffle writers) can be
                            immediately deallocated after writing
                            their shuffle data ...

                            Since the consolidation stage reads all
                            the shuffle data, why not doing the
                            transformation in that stage? What is the
                            point in deferring the transformations
                            into another stage?

                            You mention the "Native Shuffle Block
                            Migration" and say its limitation is "It
                            simply shifts the storage burden to other
                            active executors".
                            Please consider that the migration
                            process can migrate to a (in Spark
                            called) fallback storage, which
                            essentially copies the shuffle data to a
                            remote storage.

                            Kind regards,
                            Enrico

                            Am 13.11.25 um 01:40 schrieb karuppayya:
                             Hi All,

                            I propose to utilize *Remote Storage as
                            a Shuffle Store, natively in Spark* .

                            This approach would fundamentally
                            decouple shuffle storage from compute
                            nodes, mitigating *shuffle fetch
                            failures and also help with
                            aggressive downscaling*.

                            The primary goal is to enhance the
                            *elasticity and resilience* of Spark
                            workloads, leading to substantial cost
                            optimization opportunities.

                            *I welcome any initial thoughts or
                            concerns regarding this idea.*

                            *Looking forward to your feedback! *

                            JIRA: SPARK-53484
                            <https://issues.apache.org/jira/browse/SPARK-54327>
                            SPIP doc
                            
<https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>,

                            Design doc
                            
<https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
                            PoC PR
                            <https://github.com/apache/spark/pull/53028>


                            Thanks,
                            Karuppayya





-- Regards
                    Rishab Joshi

Reply via email to