Hi,
one thing to also keep in mind here is that TaskManagers might have to keep
the intermediate data even after a job is finished. For example, if the
user has an interactive scala-shell session where they are exploring same
data and transforming it in several steps as they go.

Cheers,
Aljoscha

On Wed, 19 Aug 2015 at 22:05 Robert Metzger <rmetz...@apache.org> wrote:

> Hi,
>
> I'm sorry for the late reply, I'm still working through a long email
> backlog from a one week vacation ;)
> Thank you for the long reply in this thread. Your observations are very
> good.
> I think we should indeed work towards a more fine grained "intra-job"
> elasticity. Let me comment on some of your statements below ...
>
>   * The taskManagers that are being killed off may have resources that are
> > needed but other tasks so they can't always be killed off
> > (files/intermediate results etc). This means that there needs to be some
> > sort of "are you idle?" handshake that needs to be done.
>
>
> I think here we have to distinguish between streaming and batch API jobs
> here.
> - For deployed streaming jobs, its usually impossible to take away
> TaskManagers, because we are working on infinite streams (tasks are never
> done). The simplest thing we can do is stopping machines where no tasks are
> deployed to.
> As Stephan mentioned, dynamic scaling of streaming jobs is certainly
> something interesting for the future. There, we would need a component
> which is implementing some sort of scaling policy (for example based on
> throughput, load or latency). For up or down scaling, we would then
> redeploy a job. For this feature, we certainly need nicely abstracted APIs
> for YARN and Mesos to alter the running cluster.
> - For batch jobs which are usually executed in a pipelined = streaming
> fashion, we would need to execute them in a batch-fashion. (Otherwise,
> tasks do not finish one after another)) Flink's runtime has already support
> for that. With some additional logic, allowing us to recognize when an
> intermediate dataset has been fully consumed by downstream tasks, we can
> safely deallocate machines in a Flink cluster. I think such a logic can be
> implemented in the JobManager's scheduler.
>
> * Ideally, we would want to isolate the logic (a general scheduler) that
> > says "get me a slot meeting constraints X" into one module which utilises
> > another module (Yarn or Mesos) that takes such a request and satisfies
> the
> > needs of the former. This idea is sort of inspired from the way this
> > separation exists in apache spark and seems to work out well.
>
>
> The JobManager of Flink has a component which is scheduling a job graph in
> the cluster. I think right now the system assumes that a certain number of
> machines and processing slots are available.
> But I it should not be too difficult to have something like "fake" machines
> and slots there which are allocated on demand as needed (so that you
> basically give the system an upper limit of resources to allocate)
>
> I agree with Stephan, that a first good step for fine-grained elasticity
> would be a common interface for both YARN and Mesos.
> For YARN, there are currently these (pretty YARN specific) abstract
> classes:
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
>
> I'd suggest that we first merge the "flink-mesos" integration once its
> done. After that, we can try to come up with a common interface.
>
> Are you interested on working towards that feature after the "flink-mesos"
> integration?
>
> Best,
> Robert
>
>
> On Tue, Aug 11, 2015 at 10:22 AM, ankurcha <an...@malloc64.com> wrote:
>
> > Hi Stephan / others interested,
> >
> > I have been working on the flink-mesos integration and there are
> definitely
> > some thoughts that I would like to share some thoughts about the
> > commonalities with the flink-yarn integration.
> >
> > * Both flink-mesos and flink-yarn integration as they stand today can be
> > considered as "coarse-grained" scheduling integrations. This means that
> the
> > tasks that are spawned (the task managers) are long-lived.
> >
> > * MaGuoWei is referring to something (as correctly identified by Stephan)
> > that I like to call "fine-grained" scheduling integration where, the task
> > managers are relinquished by the framework when they aren't being
> utilised
> > by Flink. This means that when the next job is executed, the job manager
> > and/or framework will spawn new task managers. This also has an implied
> > requirement that each taskManager runs one task and is then discarded.
> >
> > * Coarse-grained scheduling is preferable when we want interactive
> > (sub-second response) and waiting for a resource offer to be accepted
> and a
> > new taskManager JVM spin up time is not acceptable. The downside is that
> > long running tasks means that it may lead to underutilisation of the
> shared
> > cluster.
> >
> > * Fine-grained scheduling is preferable when a little delay (due to
> > starting
> > a new taskManager JVM) is acceptable. This means that we will have higher
> > utilisation of the cluster in a shared setting as resources that aren't
> > being used are relinquished. But, we need to be a lot more extensive
> about
> > this approach. Some of the cases that I can think of are:
> >   * The jobManager/integration-framework may need to monitor the
> > utilisation
> > of the taskManagers and kill of taskManagers based on some cool-down
> > timeout.
> >   * The taskManagers that are being killed off may have resources that
> are
> > needed but other tasks so they can't always be killed off
> > (files/intermediate results etc). This means that there needs to be some
> > sort of "are you idle?" handshake that needs to be done.
> >   * I like "fine-grained" mode but there may need to be a middle ground
> > where tasks are "coarse-grained" i.e. run multiple operators and once
> idle
> > for a certain amount of time, they are reaped/killed-off by the
> > jobManager/integration-framework.
> >
> > * Ideally, we would want to isolate the logic (a general scheduler) that
> > says "get me a slot meeting constraints X" into one module which utilises
> > another module (Yarn or Mesos) that takes such a request and satisfies
> the
> > needs of the former. This idea is sort of inspired from the way this
> > separation exists in apache spark and seems to work out well.
> >
> > * I don't know the codebase well enough to say where these things go
> based
> > on my reading of the overall architecture of the system, there is nothing
> > that can't be satisfied by the flink-runtime and it *should* not need any
> > detailed access to the execution plan. I'll defer this to someone who
> knows
> > the internals better.
> >
> >
> >
> > --
> > View this message in context:
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-tp7153p7448.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>

Reply via email to