Hi Robert, I agree with everything that you and Stephan are saying. I haven't looked into the flink codebase and the papers/design docs to comment at a finer level so maybe that's the first piece of homework that I need to do ( need pointers for that).
And, yes I would definitely be interested in owning/maintaining/extending the flink-mesos integration. -- Ankur Chauhan > On 19 Aug 2015, at 13:05, Robert Metzger [via Apache Flink Mailing List > archive.] <ml-node+s1008284n757...@n3.nabble.com> wrote: > > Hi, > > I'm sorry for the late reply, I'm still working through a long email > backlog from a one week vacation ;) > Thank you for the long reply in this thread. Your observations are very > good. > I think we should indeed work towards a more fine grained "intra-job" > elasticity. Let me comment on some of your statements below ... > > * The taskManagers that are being killed off may have resources that are > > needed but other tasks so they can't always be killed off > > (files/intermediate results etc). This means that there needs to be some > > sort of "are you idle?" handshake that needs to be done. > > > I think here we have to distinguish between streaming and batch API jobs > here. > - For deployed streaming jobs, its usually impossible to take away > TaskManagers, because we are working on infinite streams (tasks are never > done). The simplest thing we can do is stopping machines where no tasks are > deployed to. > As Stephan mentioned, dynamic scaling of streaming jobs is certainly > something interesting for the future. There, we would need a component > which is implementing some sort of scaling policy (for example based on > throughput, load or latency). For up or down scaling, we would then > redeploy a job. For this feature, we certainly need nicely abstracted APIs > for YARN and Mesos to alter the running cluster. > - For batch jobs which are usually executed in a pipelined = streaming > fashion, we would need to execute them in a batch-fashion. (Otherwise, > tasks do not finish one after another)) Flink's runtime has already support > for that. With some additional logic, allowing us to recognize when an > intermediate dataset has been fully consumed by downstream tasks, we can > safely deallocate machines in a Flink cluster. I think such a logic can be > implemented in the JobManager's scheduler. > > * Ideally, we would want to isolate the logic (a general scheduler) that > > says "get me a slot meeting constraints X" into one module which utilises > > another module (Yarn or Mesos) that takes such a request and satisfies the > > needs of the former. This idea is sort of inspired from the way this > > separation exists in apache spark and seems to work out well. > > > The JobManager of Flink has a component which is scheduling a job graph in > the cluster. I think right now the system assumes that a certain number of > machines and processing slots are available. > But I it should not be too difficult to have something like "fake" machines > and slots there which are allocated on demand as needed (so that you > basically give the system an upper limit of resources to allocate) > > I agree with Stephan, that a first good step for fine-grained elasticity > would be a common interface for both YARN and Mesos. > For YARN, there are currently these (pretty YARN specific) abstract classes: > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java > > I'd suggest that we first merge the "flink-mesos" integration once its > done. After that, we can try to come up with a common interface. > > Are you interested on working towards that feature after the "flink-mesos" > integration? > > Best, > Robert > > > On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <[hidden email]> wrote: > > > Hi Stephan / others interested, > > > > I have been working on the flink-mesos integration and there are definitely > > some thoughts that I would like to share some thoughts about the > > commonalities with the flink-yarn integration. > > > > * Both flink-mesos and flink-yarn integration as they stand today can be > > considered as "coarse-grained" scheduling integrations. This means that the > > tasks that are spawned (the task managers) are long-lived. > > > > * MaGuoWei is referring to something (as correctly identified by Stephan) > > that I like to call "fine-grained" scheduling integration where, the task > > managers are relinquished by the framework when they aren't being utilised > > by Flink. This means that when the next job is executed, the job manager > > and/or framework will spawn new task managers. This also has an implied > > requirement that each taskManager runs one task and is then discarded. > > > > * Coarse-grained scheduling is preferable when we want interactive > > (sub-second response) and waiting for a resource offer to be accepted and a > > new taskManager JVM spin up time is not acceptable. The downside is that > > long running tasks means that it may lead to underutilisation of the shared > > cluster. > > > > * Fine-grained scheduling is preferable when a little delay (due to > > starting > > a new taskManager JVM) is acceptable. This means that we will have higher > > utilisation of the cluster in a shared setting as resources that aren't > > being used are relinquished. But, we need to be a lot more extensive about > > this approach. Some of the cases that I can think of are: > > * The jobManager/integration-framework may need to monitor the > > utilisation > > of the taskManagers and kill of taskManagers based on some cool-down > > timeout. > > * The taskManagers that are being killed off may have resources that are > > needed but other tasks so they can't always be killed off > > (files/intermediate results etc). This means that there needs to be some > > sort of "are you idle?" handshake that needs to be done. > > * I like "fine-grained" mode but there may need to be a middle ground > > where tasks are "coarse-grained" i.e. run multiple operators and once idle > > for a certain amount of time, they are reaped/killed-off by the > > jobManager/integration-framework. > > > > * Ideally, we would want to isolate the logic (a general scheduler) that > > says "get me a slot meeting constraints X" into one module which utilises > > another module (Yarn or Mesos) that takes such a request and satisfies the > > needs of the former. This idea is sort of inspired from the way this > > separation exists in apache spark and seems to work out well. > > > > * I don't know the codebase well enough to say where these things go based > > on my reading of the overall architecture of the system, there is nothing > > that can't be satisfied by the flink-runtime and it *should* not need any > > detailed access to the execution plan. I'll defer this to someone who knows > > the internals better. > > > > > > > > -- > > View this message in context: > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html > > Sent from the Apache Flink Mailing List archive. mailing list archive at > > Nabble.com. > > > > > If you reply to this email, your message will be added to the discussion > below: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7578.html > To unsubscribe from add some new api to the scheduler in the job manager, > click here. > NAML -- View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7581.html Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.