Currently, Barrier TaskSet has a hard requirement that tasks can only be launched in a single resourceOffers round with enough slots(or say, sufficient resources), but can not be guaranteed even if with enough slots due to task locality delay scheduling(also see discussion https://github.com/apache/spark/pull/21758#discussion_r204917245 <https://github.com/apache/spark/pull/21758#discussion_r204917245> ). So, it is very likely that Barrier TaskSet gets a chunk of sufficient resources after all the trouble, but let it go easily just because one of pending tasks can not be scheduled. And yet, it is hard to control all tasks launching at the same time, which will bring complexity for fault tolerance. Futhermore, it causes severe resource competition between TaskSets and jobs and introduce unclear semantic for DynamicAllocation(see discussion https://github.com/apache/spark/pull/21758#discussion_r204917880 <https://github.com/apache/spark/pull/21758#discussion_r204917880> ).
So, here, I want to introduce a new mechanism, called /WorkerOffer Reservation Mechanism for barrier scheduling. With /WorkerOffer Reservation Mechanism, a barrier taskset could reserve WorkerOffer in multi resourceOffers() round, and launch tasks at the same time once it accumulate the sufficient resource. The whole process may looks like: * [1] CoarseGrainedSchedulerBackend call TaskScheduler#resourceOffers(offers) * [2] in resourceOffers(), we firstly exclude reserved cpus by barrier tasksets in previous resourceOffers() round * [3] if a task(CPU_PER_TASK = 2) from barrier taskset could launch on WorkerOffer(hostA, cores=5), lets say WO1, then, we reserve 2 cpus from WO1 for this task. So, in next resourceOffers() round, 2 cpus would be exclude from WO1. In another word, in next resourceOffers() round, WO1 would has 8 cpus to offer. And we'll regard this task as a ready task. * [4] After one or multiple resourceOffers() round, when the barrier taskset's ready tasks' num reaches taskSets' numTasks, we could launch all of the ready tasks at the same time. Besides, we have two features along with /WorkerOffer Reservation Mechanism/: * To avoid the deadlock which may be introuduced by serveral Barrier TaskSets holding the reserved WorkerOffers for a long time, we'll ask Barrier TaskSets to force releasing part of reserved WorkerOffers on demand. So, it is highly possible that each Barrier TaskSet would be launched in the end. * Barrier TaskSet could replace old high level locality reserved WorkerOffer with new low level locality WorkerOffer during the time it wating for sufficient resources, to perform better locality at the end. And there's possibility for /WorkerOffer Reservation Mechanism to work with ExecuorAllocationManager(aka DynamicAllocation): When cpus in WorkerOffer are reserved, we send a new event, called ExecutorReservedEvent, to EAM, which indicates the corresponding Executor's resource is being reserved. EAM receives that event should not regard the executor is idle and remove it later, instead, it keeps the executor(maybe, for a confined time) as it knows someone may use it later. Similarly, we send a ExecutorReleasedEvent when reserved cpus are released. /WorkerOffer Reservation Mechanism will not impact non-barrier taskset, it remains the same behavior for non-barrier taskset. To summary: * /WorkerOffer Reservation Mechanism relax the requirement for resources, since barrier taskset could be launched after multiple resourceOffer() round; * barrier tasks are guaranteed to be launched at the same time; * it provides an possibility to work with ExecuorAllocationManager; Actually, I've already filed a JIRA SPARK-26439 and pr #24010 for this(but less attention), any one interest on this could see it from the code directly. So, any one has any thoughts on this ? (personally, I think it would really do good for barrier scheduling) -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org