Hi Ben, I think that's a good question but I also think that Erons answer is sufficient for an initial implementation. We suggest more and more to use a single "cluster" per job (either a single per-job YARN cluster or in Kubernetes or in Mesos), thus we don't really have to solve the problem of efficiently supporting multiple jobs per JobManager. Regarding Job lifetime vs. Task lifetime, the open() and close() methods that Eron mentioned are, for practical purposes, invoked at the beginning/end of a Job. For example, when a streaming pipeline starts the tasks corresponding to operators will be launched and they will stay around until the job is finished. I think this is different from a system such as Google Dataflow where you will see many more small tasks in the life of a streaming job and so it shouldn't be that big of a problem for Flink.
What do you think? Best, Aljoscha > On 8. Dec 2017, at 00:40, Eron Wright <eronwri...@gmail.com> wrote: > > Could you speak to whether the lifecycle provided by RichFunction > (open/close) would fit the requirement? > > https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/api/common/functions/RichFunction.html#open-org.apache.flink.configuration.Configuration- > > On Thu, Dec 7, 2017 at 1:57 PM, Ben Sidhom <sid...@google.com.invalid> > wrote: > >> Hey, >> >> I'm working on the Apache Beam <https://beam.apache.org/> portability >> story >> and trying to figure out how we can get the Flink runner to support >> the new portability >> API <https://beam.apache.org/contribute/portability/>. >> >> In order to get the runner to work with portable SDKs, we need to be able >> to spin up and manage user containers from the TaskManagers themselves. All >> communication with user code (effectively user-defined functions) happens >> over RPC endpoints between the container and the Flink worker threads. >> Unfortunately, we cannot assume that the container images themselves are >> small or that they are cheap to start up. For this reason, we cannot >> reasonably start and stop these external services once per task (e.g., by >> wrapping service lifetimes within mapPartions). In order to support >> multiple jobs per JVM (either due to multiple task slots per manager or >> multiple jobs submitted to a cluster serially) , we need to know when to >> clean up resources associated with a particular job. >> >> Is there a way to do this in user code? Ideally, this would be something >> like a set of per-job startup and shutdown hooks that execute on each >> TaskManager that a particular job runs on. If this does not currently >> exist, how reasonable would it be to introduce client-facing APIs that >> would allow it? Is there a better approach for this lifecycle management >> that better fits into the Flink execution model? >> >> Thanks >> -- >> -Ben >>