To be honest I formerly want to firstly start a thread discuss about what per-job mode means because things gets quite different whether or not per-job mode contains exactly one JobGraph or allow to have multiple part. Plus the complexity that whether or not we support post-execution logic it becomes more unclear what per-job looks like in user perspective.
But the original purpose is towards a concrete PerJobExecutor and I want to save bandwidth by reduce concurrent coupled threads a bit. Zili Chen <wander4...@gmail.com> 于2019年10月2日周三 下午5:33写道: > Hi Till, > > The purpose to post thoughts above here is because FLIP-73 is unclear on > how to > achieve PerJobExecutor. In order to discuss this topic it is necessary to > clarify how > per-job mode runs regardless what it is now. > > With PerJobExecutor called in Environment I don't think we still keep > current logic. If > we keep current logic, it looks like > > 1. env.execute calls executor.execute > 2. executor get current job graph, deploy a job cluster > 3. for the rest part, shall we deploy a new job cluster? reuse the > previous job cluster? > or as current logic, we abort on the first submission? > > These question should be answered to clarify what PerJobExecutor is and > how it works. > > Best, > tison > > > Till Rohrmann <trohrm...@apache.org> 于2019年10月2日周三 下午5:19写道: > >> I'm not sure whether removing the current per-job mode semantics all >> together is a good idea. It has some nice properties, for example the >> JobGraph stays constant. With your proposal which I would coin the >> driver mode, the JobGraph would be regenerated in case of a failover. >> Depending on the user code logic, this could generate a different JobGraph. >> >> Aren't we unnecessarily widening the scope of this FLIP here? Wouldn't it >> be possible to introduce the Executors without changing Flink's deployment >> options in the first step? I don't fully understand where this >> need/requirement comes from. >> >> Cheers, >> Till >> >> On Wed, Oct 2, 2019 at 10:58 AM Zili Chen <wander4...@gmail.com> wrote: >> >>> Thanks for your thoughts Kostas! >>> >>> I agree Executor to be a concept on clients now. And sincerely second >>> the description >>> >>> Now the Executor simply uses a client, e.g. a ClusterClient, to submit >>> the job (JobGraph) that it will create from the user program. >>> In that sense, the Executor is one level of abstraction above the >>> clients, as it adds more functionality and it uses the one offered by >>> the client. >>> >>> In fact, let's think of the statement an Executor simply uses a client >>> to submit the job. >>> I'd like to give a description of how job submission works in per-job >>> mode and it will >>> follow a similar view now which >>> >>> (1) achieve run client on cluster side @Stephan Ewen <se...@apache.org> >>> (2) support multi-parts per-job program so that we don't hack to >>> fallback to session cluster >>> in this case @Till Rohrmann <trohrm...@apache.org> >>> >>> Let's start with an example we submit a user program via CLI in per-job >>> mode. >>> >>> 1) CLI generates configuration for getting all information about >>> deployment. >>> 2) CLI deploys a job cluster *with user jars* and specially mark the jar >>> contains user program >>> 3) JobClusterEntrypoint takes care of the bootstrap of flink cluster and >>> executes user program, >>> respects all configuration passed from client >>> 4) user program now runs on cluster side, it starts executing main >>> method, get a environment with >>> information of the associated job cluster. since the cluster has already >>> started, it can submit the >>> job to that cluster as in session cluster. >>> 5) job cluster shutdown on user program exits *and* Dispatcher doesn't >>> maintain any jobs. >>> >>> Since we actually runs client on cluster side we can execute multi-parts >>> program because we submit >>> to local cluster one by one. And because we change the process from >>> >>> - start a per job cluster with job graph >>> >>> to >>> >>> + start a per job cluster with user program >>> >>> we runs client on cluster side, it avoids that we "extract" job graph >>> from user program which limits >>> on multi-parts program and doesn't respect user logic outside of Flink >>> related code. >>> >>> Take session scenario into consideration, overall we now have >>> >>> 1. ClusterDeployer and its factory which are SPI for platform developers >>> so that they can deploy a >>> job cluster with user program or session cluster. >>> >>> 2. Environment and Executor is unified. Environment helps describe user >>> program logic and internally >>> compile the job as well as submit job with Executor. Executor always >>> make use of a ClusterClient >>> to submit the job. Specifically, in per-job mode, Environment reads >>> configuration refined by job cluster >>> so that it knows how to generate a ClusterClient. >>> >>> 3. Platform developers gets ClusterClient as return value of deploy >>> method of ClusterDeployer or >>> retrieves from an existing public known session Cluster(by >>> ClusterRetriever or extend ClusterDeploy to >>> another general concept). >>> >>> 4. JobClient can be used by user program writer or platform developer >>> for manage job in different condition. >>> >>> There are many other refactor we can do to respect this architecture but >>> let's re-emphasize the key difference >>> >>> ** job cluster doesn't start with a job graph anymore but start with a >>> user program and it runs the program >>> on the same place as the cluster runs on. So that for the program, it is >>> nothing different to a session cluster. >>> It just an existing cluster. ** >>> >>> Best, >>> tison. >>> >>