Hi

Thanks for bringing this.

The design looks very nice to me in that
1. In the new per-job mode, we don't need to compile user programs in the
client and can directly run user programs with user jars. That way, it's
easier for resource isolation in multi-tenant platforms and is much safer.
2. The execution of user programs can be unified in session and per-job
modes. In session mode, user jobs are submitted via a remote ClusterClient
while in per-job mode user jobs are submitted via a local ClusterClient.

Regards,
Xiaogang

tison <wander4...@gmail.com> 于2019年10月30日周三 下午3:30写道:

> (CC user list because I think users may have ideas on how per-job mode
> should look like)
>
> Hi all,
>
> In the discussion about Flink on k8s[1] we encounter a problem that
> opinions
> diverge in how so-called per-job mode works. This thread is aimed at
> stating
> a dedicated discussion about per-job semantic and how to implement it.
>
> **The AS IS per-job mode**
>
> * in standalone deployment, we bundle user jar with Flink jar, retrieve
> JobGraph which is the very first JobGraph from user program in classpath,
> and then start a Dispatcher with this JobGraph preconfigured, which
> launches it as "recovered" job.
>
> * in YARN deployment, we accept submission via CliFrontend, extract
> JobGraph
> which is the very first JobGraph from user program submitted, serialize
> the JobGraph and upload it to YARN as resource, and then when AM starts,
> retrieve the JobGraph as resource and start Dispatcher with this JobGraph
> preconfigured, follows are the same.
>
> Specifically, in order to support multiple parts job, if YARN deployment
> configured as "attached", it starts a SessionCluster, proceeds the progress
> and shutdown the cluster on job finished.
>
> **Motivation**
>
> The implementation mentioned above, however, suffers from problems. The
> major
> two of them are 1. only respect the very first JobGraph from user program
> 2.
> compile job in client side
>
> 1. Only respect the very first JobGraph from user program
>
> There is already issue about this topic[2]. As we extract JobGraph from
> user
> program by hijacking Environment#execute we actually abort any execution
> after the first call to #execute. Besides it surprises users many times
> that
> any logic they write in the program is possibly never executed, here the
> problem is that the semantic of "job" from Flink perspective. I'd like to
> say
> in current implementation "per-job" is actually "per-job-graph". However,
> in practices since we support jar submission it is "per-program" semantic
> wanted.
>
> 2. Compile job in client side
>
> Well, standalone deployment is not in the case. But in YARN deployment, we
> compile job and get JobGraph in client side, and then upload it to YARN.
> This approach, however, somehow breaks isolation. We have observed that
> user
> program contains exception handling logic which call System.exit in main
> method, which causes a compilation of the job exit the whole client at
> once.
> It is a critical problem if we manage multiple Flink job in a unique
> platform.
> In this case, it shut down the whole service.
>
> Besides there are many times I was asked why per-job mode doesn't run
> "just like" session mode but with a dedicated cluster. It might imply that
> current implementation mismatches users' demand.
>
> **Proposal**
>
> In order to provide a "per-program" semantic mode which acts "just like"
> session
> mode but with a dedicated cluster, I propose a workflow as below. It acts
> like
> starting a drive on cluster but is not a general driver solution as
> proposed
> here[3], the main purpose of the workflow below is for providing a
> "per-program"
> semantic mode.
>
> *From CliFrontend*
>
> 1. CliFrontend receives submission, gathers all configuration and starts a
> corresponding ClusterDescriptor.
>
> 2. ClusterDescriptor deploys a cluster with main class
> ProgramClusterEntrypoint
> while shipping resources including user program.
>
> 3. ProgramClusterEntrypoint#main contains logic starting components
> including
> Standalone Dispatcher, configuring user program to start a
> RpcClusterClient,
> and then invoking main method of user program.
>
> 4. RpcClusterClient acts like MiniClusterClient which is able to submit the
> JobGraph after leader elected so that we don't fallback to round-robin or
> fail submission due to no leader.
>
> 5. Whether or not deliver job result depends on user program logic, since
> we
> can already get a JobClient from execute. ProgramClusterEntrypoint exits on
> user program exits and all jobs submitted globally terminate.
>
> This way fits in the direction of FLIP-73 because strategy starting a
> RpcClusterClient can be regarded as a special Executor. After
> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes
> configuration to
> user program so that when Executor generated, it knows to use a
> RpcClusterClient
> for submission and the address of Dispatcher.
>
> **Compatibility**
>
> In my opinion this mode can be totally an add-on to current codebase. We
> actually don't replace current per-job mode with so-called "per-program"
> mode.
> It happens that current per-job mode would be useless if we have such
> "per-program" mode so that we possibly deprecate it for preferring the
> other.
>
> I'm glad to discuss more into details if you're interested in, but let's
> say
> we'd better first reach a consensus on the overall design :-)
>
> Looking forward to your reply!
>
> Best,
> tison.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9953
> [2] https://issues.apache.org/jira/browse/FLINK-10879
> [3]
> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#
>

Reply via email to