Hi Thanks for bringing this.
The design looks very nice to me in that 1. In the new per-job mode, we don't need to compile user programs in the client and can directly run user programs with user jars. That way, it's easier for resource isolation in multi-tenant platforms and is much safer. 2. The execution of user programs can be unified in session and per-job modes. In session mode, user jobs are submitted via a remote ClusterClient while in per-job mode user jobs are submitted via a local ClusterClient. Regards, Xiaogang tison <wander4...@gmail.com> 于2019年10月30日周三 下午3:30写道: > (CC user list because I think users may have ideas on how per-job mode > should look like) > > Hi all, > > In the discussion about Flink on k8s[1] we encounter a problem that > opinions > diverge in how so-called per-job mode works. This thread is aimed at > stating > a dedicated discussion about per-job semantic and how to implement it. > > **The AS IS per-job mode** > > * in standalone deployment, we bundle user jar with Flink jar, retrieve > JobGraph which is the very first JobGraph from user program in classpath, > and then start a Dispatcher with this JobGraph preconfigured, which > launches it as "recovered" job. > > * in YARN deployment, we accept submission via CliFrontend, extract > JobGraph > which is the very first JobGraph from user program submitted, serialize > the JobGraph and upload it to YARN as resource, and then when AM starts, > retrieve the JobGraph as resource and start Dispatcher with this JobGraph > preconfigured, follows are the same. > > Specifically, in order to support multiple parts job, if YARN deployment > configured as "attached", it starts a SessionCluster, proceeds the progress > and shutdown the cluster on job finished. > > **Motivation** > > The implementation mentioned above, however, suffers from problems. The > major > two of them are 1. only respect the very first JobGraph from user program > 2. > compile job in client side > > 1. Only respect the very first JobGraph from user program > > There is already issue about this topic[2]. As we extract JobGraph from > user > program by hijacking Environment#execute we actually abort any execution > after the first call to #execute. Besides it surprises users many times > that > any logic they write in the program is possibly never executed, here the > problem is that the semantic of "job" from Flink perspective. I'd like to > say > in current implementation "per-job" is actually "per-job-graph". However, > in practices since we support jar submission it is "per-program" semantic > wanted. > > 2. Compile job in client side > > Well, standalone deployment is not in the case. But in YARN deployment, we > compile job and get JobGraph in client side, and then upload it to YARN. > This approach, however, somehow breaks isolation. We have observed that > user > program contains exception handling logic which call System.exit in main > method, which causes a compilation of the job exit the whole client at > once. > It is a critical problem if we manage multiple Flink job in a unique > platform. > In this case, it shut down the whole service. > > Besides there are many times I was asked why per-job mode doesn't run > "just like" session mode but with a dedicated cluster. It might imply that > current implementation mismatches users' demand. > > **Proposal** > > In order to provide a "per-program" semantic mode which acts "just like" > session > mode but with a dedicated cluster, I propose a workflow as below. It acts > like > starting a drive on cluster but is not a general driver solution as > proposed > here[3], the main purpose of the workflow below is for providing a > "per-program" > semantic mode. > > *From CliFrontend* > > 1. CliFrontend receives submission, gathers all configuration and starts a > corresponding ClusterDescriptor. > > 2. ClusterDescriptor deploys a cluster with main class > ProgramClusterEntrypoint > while shipping resources including user program. > > 3. ProgramClusterEntrypoint#main contains logic starting components > including > Standalone Dispatcher, configuring user program to start a > RpcClusterClient, > and then invoking main method of user program. > > 4. RpcClusterClient acts like MiniClusterClient which is able to submit the > JobGraph after leader elected so that we don't fallback to round-robin or > fail submission due to no leader. > > 5. Whether or not deliver job result depends on user program logic, since > we > can already get a JobClient from execute. ProgramClusterEntrypoint exits on > user program exits and all jobs submitted globally terminate. > > This way fits in the direction of FLIP-73 because strategy starting a > RpcClusterClient can be regarded as a special Executor. After > ProgramClusterEntrypoint#main starts a Cluster, it generates and passes > configuration to > user program so that when Executor generated, it knows to use a > RpcClusterClient > for submission and the address of Dispatcher. > > **Compatibility** > > In my opinion this mode can be totally an add-on to current codebase. We > actually don't replace current per-job mode with so-called "per-program" > mode. > It happens that current per-job mode would be useless if we have such > "per-program" mode so that we possibly deprecate it for preferring the > other. > > I'm glad to discuss more into details if you're interested in, but let's > say > we'd better first reach a consensus on the overall design :-) > > Looking forward to your reply! > > Best, > tison. > > [1] https://issues.apache.org/jira/browse/FLINK-9953 > [2] https://issues.apache.org/jira/browse/FLINK-10879 > [3] > https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# >