Felix - please add me to this event. Ben - should we move this proposal to a doc and open it up for edits/comments.
On Wed, Nov 20, 2019 at 5:37 PM Felix Cheung <felixcheun...@hotmail.com> wrote: > Great! > > Due to number of constraints I won’t be sending link directly here but > please r me and I will add you. > > > ------------------------------ > *From:* Ben Sidhom <sid...@google.com.INVALID> > *Sent:* Wednesday, November 20, 2019 9:10:01 AM > *To:* John Zhuge <jzh...@apache.org> > *Cc:* bo yang <bobyan...@gmail.com>; Amogh Margoor <amo...@qubole.com>; > Ryan Blue <rb...@netflix.com>; Ben Sidhom <sid...@google.com.invalid>; > Spark Dev List <dev@spark.apache.org>; Christopher Crosbie < > crosb...@google.com>; Griselda Cuevas <g...@google.com>; Holden Karau < > hol...@pigscanfly.ca>; Mayank Ahuja <mah...@qubole.com>; Kalyan Sivakumar > <kaly...@qubole.com>; alfo...@fb.com <alfo...@fb.com>; Felix Cheung < > fel...@uber.com>; Matt Cheah <mch...@palantir.com>; Yifei Huang (PD) < > yif...@palantir.com> > *Subject:* Re: Enabling fully disaggregated shuffle on Spark > > That sounds great! > > On Wed, Nov 20, 2019 at 9:02 AM John Zhuge <jzh...@apache.org> wrote: > > That will be great. Please send us the invite. > > On Wed, Nov 20, 2019 at 8:56 AM bo yang <bobyan...@gmail.com> wrote: > > Cool, thanks Ryan, John, Amogh for the reply! Great to see you interested! > Felix will have a Spark Scalability & Reliability Sync meeting on Dec 4 1pm > PST. We could discuss more details there. Do you want to join? > > On Tue, Nov 19, 2019 at 4:23 PM Amogh Margoor <amo...@qubole.com> wrote: > > We at Qubole are also looking at disaggregating shuffle on Spark. Would > love to collaborate and share learnings. > > Regards, > Amogh > > On Tue, Nov 19, 2019 at 4:09 PM John Zhuge <jzh...@apache.org> wrote: > > Great work, Bo! Would love to hear the details. > > > On Tue, Nov 19, 2019 at 4:05 PM Ryan Blue <rb...@netflix.com.invalid> > wrote: > > I'm interested in remote shuffle services as well. I'd love to hear about > what you're using in production! > > rb > > On Tue, Nov 19, 2019 at 2:43 PM bo yang <bobyan...@gmail.com> wrote: > > Hi Ben, > > Thanks for the writing up! This is Bo from Uber. I am in Felix's team in > Seattle, and working on disaggregated shuffle (we called it remote shuffle > service, RSS, internally). We have put RSS into production for a while, and > learned a lot during the work (tried quite a few techniques to improve the > remote shuffle performance). We could share our learning with the > community, and also would like to hear feedback/suggestions on how to > further improve remote shuffle performance. We could chat more details if > you or other people are interested. > > Best, > Bo > > On Fri, Nov 15, 2019 at 4:10 PM Ben Sidhom <sid...@google.com.invalid> > wrote: > > I would like to start a conversation about extending the Spark shuffle > manager surface to support fully disaggregated shuffle implementations. > This is closely related to the work in SPARK-25299 > <https://issues.apache.org/jira/browse/SPARK-25299>, which is focused on > refactoring the shuffle manager API (and in particular, SortShuffleManager) > to use a pluggable storage backend. The motivation for that SPIP is further > enabling Spark on Kubernetes. > > > The motivation for this proposal is enabling full externalized > (disaggregated) shuffle service implementations. (Facebook’s Cosco shuffle > <https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> > is one example of such a disaggregated shuffle service.) These changes > allow the bulk of the shuffle to run in a remote service such that minimal > state resides in executors and local disk spill is minimized. The net > effect is increased job stability and performance improvements in certain > scenarios. These changes should work well with or are complementary to > SPARK-25299. Some or all points may be merged into that issue as > appropriate. > > > Below is a description of each component of this proposal. These changes > can ideally be introduced incrementally. I would like to gather feedback > and gauge interest from others in the community to collaborate on this. > There are likely more points that would be useful to disaggregated shuffle > services. We can outline a more concrete plan after gathering enough input. > A working session could help us kick off this joint effort; maybe something > in the mid-January to mid-February timeframe (depending on interest and > availability. I’m happy to host at our Sunnyvale, CA offices. > > > Proposal Scheduling and re-executing tasks > > Allow coordination between the service and the Spark DAG scheduler as to > whether a given block/partition needs to be recomputed when a task fails or > when shuffle block data cannot be read. Having such coordination is > important, e.g., for suppressing recomputation after aborted executors or > for forcing late recomputation if the service internally acts as a cache. > One catchall solution is to have the shuffle manager provide an indication > of whether shuffle data is external to executors (or nodes). Another > option: allow the shuffle manager (likely on the driver) to be queried for > the existence of shuffle data for a given executor ID (or perhaps map task, > reduce task, etc). Note that this is at the level of data the scheduler is > aware of (i.e., map/reduce partitions) rather than block IDs, which are > internal details for some shuffle managers. > ShuffleManager API > > Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the > service knows that data is still active. This is one way to enable > time-/job-scoped data because a disaggregated shuffle service cannot rely > on robust communication with Spark and in general has a distinct lifecycle > from the Spark deployment(s) it talks to. This would likely take the form > of a callback on ShuffleManager itself, but there are other approaches. > > > Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle > connections/streams/file handles as well as provide commit semantics). > SPARK-25299 adds commit semantics to the internal data storage layer, but > this is applicable to all shuffle managers at a higher level and should > apply equally to the ShuffleWriter. > > > Do not require ShuffleManagers to expose ShuffleBlockResolvers where they > are not needed. Ideally, this would be an implementation detail of the > shuffle manager itself. If there is substantial overlap between the > SortShuffleManager and other implementations, then the storage details can > be abstracted at the appropriate level. (SPARK-25299 does not currently > change this.) > > > Do not require MapStatus to include blockmanager IDs where they are not > relevant. This is captured by ShuffleBlockInfo > <https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit#heading=h.imi27prnziyj> > including an optional BlockManagerId in SPARK-25299. However, this change > should be lifted to the MapStatus level so that it applies to all > ShuffleManagers. Alternatively, use a more general data-location > abstraction than BlockManagerId. This gives the shuffle manager more > flexibility and the scheduler more information with respect to data > residence. > Serialization > > Allow serializers to be used more flexibly and efficiently. For example, > have serializers support writing an arbitrary number of objects into an > existing OutputStream or ByteBuffer. This enables objects to be serialized > to direct buffers where doing so makes sense. More importantly, it allows > arbitrary metadata/framing data to be wrapped around individual objects > cheaply. Right now, that’s only possible at the stream level. (There are > hacks around this, but this would enable more idiomatic use in efficient > shuffle implementations.) > > > Have serializers indicate whether they are deterministic. This provides > much of the value of a shuffle service because it means that reducers do > not need to spill to disk when reading/merging/combining inputs--the data > can be grouped by the service, even without the service understanding data > types or byte representations. Alternative (less preferable since it would > break Java serialization, for example): require all serializers to be > deterministic. > > > > -- > > - Ben > > > > -- > Ryan Blue > Software Engineer > Netflix > > > > -- > John Zhuge > > > > -- > John Zhuge > > > > -- > -Ben > -- "...:::Aniket:::... Quetzalco@tl"