Hi Ben, in general everything you're proposing sounds reasonable. For me, at least, I'd need more details on most of the points before I fully understand them, but I'm definitely in favor of the general goal for making spark support fully disaggregated shuffle. Of course, I also want to make sure it can be done in a way that involves the least risky changes to spark itself and we can continue to support.
One very-high level point which I think is worth keeping in mind for the wider community following this -- the key difference between what you are proposing and SPARK-25299, is that SPARK-25299 still uses spark's existing shuffle implementation, which leverages local disk. Your goal is to better support shuffling all data via some external service, which avoids shuffle data hitting executors local disks entirely. This was already possible, to some extent, even before SPARK-25299 with the ShuffleManager api; but as you note, there are shortcomings which need to be addressed. (Historical note: that api wasn't designed with totally distributed shuffle services in mind, it was to support hash- vs. sort-based shuffle, all still on spark's executors.) One thing that I thought you would have needed, but you didn't mention here, is changes to the scheduler to add an extra step between the shuffle-write & shuffle-read stages, if it needs to do any work to reorganize data, I think I have heard this come up in prior discussions. A couple of inline comments below: On Fri, Nov 15, 2019 at 6:10 PM Ben Sidhom <sid...@google.com.invalid> wrote: > Proposal > Scheduling and re-executing tasks > > Allow coordination between the service and the Spark DAG scheduler as to > whether a given block/partition needs to be recomputed when a task fails or > when shuffle block data cannot be read. Having such coordination is > important, e.g., for suppressing recomputation after aborted executors or > for forcing late recomputation if the service internally acts as a cache. > One catchall solution is to have the shuffle manager provide an indication > of whether shuffle data is external to executors (or nodes). Another > option: allow the shuffle manager (likely on the driver) to be queried for > the existence of shuffle data for a given executor ID (or perhaps map task, > reduce task, etc). Note that this is at the level of data the scheduler is > aware of (i.e., map/reduce partitions) rather than block IDs, which are > internal details for some shuffle managers. > sounds reasonable, and I think @Matt Cheah <mch...@palantir.com> mentioned something like this has come up with their work on SPARK-25299 and was going to be added even for that work. (of course, need to look at the actual proposal closely and how it impacts the scheduler.) > ShuffleManager API > > Add a heartbeat (keep-alive) mechanism to RDD shuffle output so that the > service knows that data is still active. This is one way to enable > time-/job-scoped data because a disaggregated shuffle service cannot rely > on robust communication with Spark and in general has a distinct lifecycle > from the Spark deployment(s) it talks to. This would likely take the form > of a callback on ShuffleManager itself, but there are other approaches. > I believe this can already be done, but maybe its much uglier than it needs to be (though I don't recall the details off the top of my head). > Add lifecycle hooks to shuffle readers and writers (e.g., to close/recycle > connections/streams/file handles as well as provide commit semantics). > SPARK-25299 adds commit semantics to the internal data storage layer, but > this is applicable to all shuffle managers at a higher level and should > apply equally to the ShuffleWriter. > ShuffleWriter has a def stop(success: Boolean): Option[MapStatus] I would need more info about why that isn't enough. (But if there is a need for it, yes this makes sense.) > Serialization > > Allow serializers to be used more flexibly and efficiently. For example, > have serializers support writing an arbitrary number of objects into an > existing OutputStream or ByteBuffer. This enables objects to be serialized > to direct buffers where doing so makes sense. More importantly, it allows > arbitrary metadata/framing data to be wrapped around individual objects > cheaply. Right now, that’s only possible at the stream level. (There are > hacks around this, but this would enable more idiomatic use in efficient > shuffle implementations.) > I don't really understand how this is different from the existing SerializationStream -- probably a small example would clarify. > Have serializers indicate whether they are deterministic. This provides > much of the value of a shuffle service because it means that reducers do > not need to spill to disk when reading/merging/combining inputs--the data > can be grouped by the service, even without the service understanding data > types or byte representations. Alternative (less preferable since it would > break Java serialization, for example): require all serializers to be > deterministic. > I really don't understand this one, sorry, can you elaborate more? I'm not sure what determinism has to do with spilling to disk. There is already supportsRelocationOfSerializedObjects , though that is private, which seems related but I think you're talking about something else? thanks, Imran >