Hi, I'm sorry for the late reply, I'm still working through a long email backlog from a one week vacation ;) Thank you for the long reply in this thread. Your observations are very good. I think we should indeed work towards a more fine grained "intra-job" elasticity. Let me comment on some of your statements below ...
* The taskManagers that are being killed off may have resources that are > needed but other tasks so they can't always be killed off > (files/intermediate results etc). This means that there needs to be some > sort of "are you idle?" handshake that needs to be done. I think here we have to distinguish between streaming and batch API jobs here. - For deployed streaming jobs, its usually impossible to take away TaskManagers, because we are working on infinite streams (tasks are never done). The simplest thing we can do is stopping machines where no tasks are deployed to. As Stephan mentioned, dynamic scaling of streaming jobs is certainly something interesting for the future. There, we would need a component which is implementing some sort of scaling policy (for example based on throughput, load or latency). For up or down scaling, we would then redeploy a job. For this feature, we certainly need nicely abstracted APIs for YARN and Mesos to alter the running cluster. - For batch jobs which are usually executed in a pipelined = streaming fashion, we would need to execute them in a batch-fashion. (Otherwise, tasks do not finish one after another)) Flink's runtime has already support for that. With some additional logic, allowing us to recognize when an intermediate dataset has been fully consumed by downstream tasks, we can safely deallocate machines in a Flink cluster. I think such a logic can be implemented in the JobManager's scheduler. * Ideally, we would want to isolate the logic (a general scheduler) that > says "get me a slot meeting constraints X" into one module which utilises > another module (Yarn or Mesos) that takes such a request and satisfies the > needs of the former. This idea is sort of inspired from the way this > separation exists in apache spark and seems to work out well. The JobManager of Flink has a component which is scheduling a job graph in the cluster. I think right now the system assumes that a certain number of machines and processing slots are available. But I it should not be too difficult to have something like "fake" machines and slots there which are allocated on demand as needed (so that you basically give the system an upper limit of resources to allocate) I agree with Stephan, that a first good step for fine-grained elasticity would be a common interface for both YARN and Mesos. For YARN, there are currently these (pretty YARN specific) abstract classes: https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java I'd suggest that we first merge the "flink-mesos" integration once its done. After that, we can try to come up with a common interface. Are you interested on working towards that feature after the "flink-mesos" integration? Best, Robert On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <an...@malloc64.com> wrote: > Hi Stephan / others interested, > > I have been working on the flink-mesos integration and there are definitely > some thoughts that I would like to share some thoughts about the > commonalities with the flink-yarn integration. > > * Both flink-mesos and flink-yarn integration as they stand today can be > considered as "coarse-grained" scheduling integrations. This means that the > tasks that are spawned (the task managers) are long-lived. > > * MaGuoWei is referring to something (as correctly identified by Stephan) > that I like to call "fine-grained" scheduling integration where, the task > managers are relinquished by the framework when they aren't being utilised > by Flink. This means that when the next job is executed, the job manager > and/or framework will spawn new task managers. This also has an implied > requirement that each taskManager runs one task and is then discarded. > > * Coarse-grained scheduling is preferable when we want interactive > (sub-second response) and waiting for a resource offer to be accepted and a > new taskManager JVM spin up time is not acceptable. The downside is that > long running tasks means that it may lead to underutilisation of the shared > cluster. > > * Fine-grained scheduling is preferable when a little delay (due to > starting > a new taskManager JVM) is acceptable. This means that we will have higher > utilisation of the cluster in a shared setting as resources that aren't > being used are relinquished. But, we need to be a lot more extensive about > this approach. Some of the cases that I can think of are: > * The jobManager/integration-framework may need to monitor the > utilisation > of the taskManagers and kill of taskManagers based on some cool-down > timeout. > * The taskManagers that are being killed off may have resources that are > needed but other tasks so they can't always be killed off > (files/intermediate results etc). This means that there needs to be some > sort of "are you idle?" handshake that needs to be done. > * I like "fine-grained" mode but there may need to be a middle ground > where tasks are "coarse-grained" i.e. run multiple operators and once idle > for a certain amount of time, they are reaped/killed-off by the > jobManager/integration-framework. > > * Ideally, we would want to isolate the logic (a general scheduler) that > says "get me a slot meeting constraints X" into one module which utilises > another module (Yarn or Mesos) that takes such a request and satisfies the > needs of the former. This idea is sort of inspired from the way this > separation exists in apache spark and seems to work out well. > > * I don't know the codebase well enough to say where these things go based > on my reading of the overall architecture of the system, there is nothing > that can't be satisfied by the flink-runtime and it *should* not need any > detailed access to the execution plan. I'll defer this to someone who knows > the internals better. > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. >