Hey, I'm working on the Apache Beam <https://beam.apache.org/> portability story and trying to figure out how we can get the Flink runner to support the new portability API <https://beam.apache.org/contribute/portability/>.
In order to get the runner to work with portable SDKs, we need to be able to spin up and manage user containers from the TaskManagers themselves. All communication with user code (effectively user-defined functions) happens over RPC endpoints between the container and the Flink worker threads. Unfortunately, we cannot assume that the container images themselves are small or that they are cheap to start up. For this reason, we cannot reasonably start and stop these external services once per task (e.g., by wrapping service lifetimes within mapPartions). In order to support multiple jobs per JVM (either due to multiple task slots per manager or multiple jobs submitted to a cluster serially) , we need to know when to clean up resources associated with a particular job. Is there a way to do this in user code? Ideally, this would be something like a set of per-job startup and shutdown hooks that execute on each TaskManager that a particular job runs on. If this does not currently exist, how reasonable would it be to introduce client-facing APIs that would allow it? Is there a better approach for this lifecycle management that better fits into the Flink execution model? Thanks -- -Ben