Hi All, Thanks @Tison for starting the discussion and I think we have very similar scenario with Theo's use cases. In our case we also generates the job graph using a client service (which serves multiple job graph generation from multiple user code) and we've found that managing the upload/download between the cluster and the DFS to be trick and error-prone. In addition, the management of different environment and requirement from different user in a single service posts even more trouble for us.
However, shifting the job graph generation towards the cluster side also requires some thoughts regarding how to manage the driver-job as well as some dependencies conflicts - In the case for shipping the job graph generation to the cluster, some unnecessary dependencies for the runtime will be pulled in by the driver-job (correct me if I were wrong Theo) I think in general I agree with @Gyula's main point: unless there is a very strong reason, it is better if we put the driver-mode as an opt-in (at least at the beginning). I left some comments on the document as well. Please kindly take a look. Thanks, Rong On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina <regina.c...@gs.com> wrote: > Yeah just chiming in this conversation as well. We heavily use multiple > job graphs to get isolation around retry logic and resource allocation > across the job graphs. Putting all these parallel flows into a single graph > would mean sharing of TaskManagers across what was meant to be truly > independent. > > > > We also build our job graphs dynamically based off of the state of the > world at the start of the job. While we’ve had a share of the pain > described, my understanding is that there would be a tradeoff in number of > jobs being submitted to the cluster and corresponding resource allocation > requests. In the model with multiple jobs in a program, there’s at least > the opportunity to reuse idle taskmanagers. > > > > > > > > > > *From:* Theo Diefenthal <theo.diefent...@scoop-software.de> > *Sent:* Thursday, October 31, 2019 10:56 AM > *To:* user@flink.apache.org > *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode > > > > I agree with Gyula Fora, > > > > In our case, we have a client-machine in the middle between our YARN > cluster and some backend services, which can not be reached directly from > the cluster nodes. On application startup, we connect to some external > systems, get some information crucial for the job runtime and finally build > up the job graph to be committed. > > > > It is true that we could workaround this, but it would be pretty annoying > to connect to the remote services, collect the data, upload it to HDFS, > start the job and make sure, housekeeping of those files is also done at > some later time. > > > > The current behavior also corresponds to the behavior of Sparks driver > mode, which made the transition from Spark to Flink easier for us. > > > > But I see the point, especially in terms of Kubernetes and would thus also > vote for an opt-in solution, being the client compilation the default and > having an option for the per-program mode as well. > > > > Best regards > > > ------------------------------ > > *Von: *"Flavio Pompermaier" <pomperma...@okkam.it> > *An: *"Yang Wang" <danrtsey...@gmail.com> > *CC: *"tison" <wander4...@gmail.com>, "Newport, Billy" < > billy.newp...@gs.com>, "Paul Lam" <paullin3...@gmail.com>, "SHI Xiaogang" > <shixiaoga...@gmail.com>, "dev" <d...@flink.apache.org>, "user" < > user@flink.apache.org> > *Gesendet: *Donnerstag, 31. Oktober 2019 10:45:36 > *Betreff: *Re: [DISCUSS] Semantic and implementation of per-job mode > > > > Hi all, > > we're using a lot the multiple jobs in one program and this is why: when > you fetch data from a huge number of sources and, for each source, you do > some transformation and then you want to write into a single directory the > union of all outputs (this assumes you're doing batch). When the number of > sources is large, if you want to do this in a single job, the graph becomes > very big and this is a problem for several reasons: > > - too many substasks /threadsi per slot > - increase of back pressure > - if a single "sub-job" fails all the job fails..this is very annoying > if this happens after a half a day for example > - In our use case, the big-graph mode takes much longer than running > each job separately (but maybe this is true only if you don't have much > hardware resources) > - debugging the cause of a fail could become a daunting task if the > job graph is too large > - we faced may strange errors when trying to run the single big-job > mode (due to serialization corruption) > > So, summarizing our overall experience with Flink batch is: the easier is > the job graph the better! > > > > Best, > > Flavio > > > > > > On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <danrtsey...@gmail.com> wrote: > > Thanks for tison starting this exciting discussion. We also suffer a lot > from the per job mode. > > I think the per-job cluster is a dedicated cluster for only one job and > will not accept more other > > jobs. It has the advantage of one-step submission, do not need to start > dispatcher first and > > then submit the job. And it does not matter where the job graph is > generated and job is submitted. > > Now we have two cases. > > > (1) Current Yarn detached cluster. The job graph is generated in client > and then use distributed > > cache to flink master container. And the MiniDispatcher uses > `FileJobGraphRetrieve` to get it. > > The job will be submitted at flink master side. > > > (2) Standalone per job cluster. User jars are already built into image. So > the job graph will be > > generated at flink master side and `ClasspathJobGraphRetriver` is used to > get it. The job will > > also be submitted at flink master side. > > > For the (1) and (2), only one job in user program could be supported. The > per job means > > per job-graph, so it works just as expected. > > > > Tison suggests to add a new mode "per-program”. The user jar will be > transferred to flink master > > container, and a local client will be started to generate job graph and > submit job. I think it could > > cover all the functionality of current per job, both (1) and (2). Also the > detach mode and attach > > mode could be unified. We do not need to start a session cluster to > simulate per job for multiple parts. > > > I am in favor of the “per-program” mode. Just two concerns. > 1. How many users are using multiple jobs in one program? > 2. Why do not always use session cluster to simulate per job? Maybe > one-step submission > > is a convincing reason. > > Best, > > Yang > > > > tison <wander4...@gmail.com> 于2019年10月31日周四 上午9:18写道: > > Thanks for your attentions! > > > > @shixiaoga...@gmail.com <shixiaoga...@gmail.com> > > > > Yes correct. We try to avoid jobs affect one another. Also a local > ClusterClient > > in case saves the overhead about retry before leader elected and persist > > JobGraph before submission in RestClusterClient as well as the net cost. > > > > @Paul Lam <paullin3...@gmail.com> > > > > 1. Here is already a note[1] about multiple part jobs. I am also confused > a bit > > on this concept at first :-) Things go in similar way if you program > contains the > > only JobGraph so that I think per-program acts like per-job-graph in this > case > > which provides compatibility for many of one job graph program. > > > > Besides, we have to respect user program which doesn't with current > > implementation because we return abruptly when calling env#execute which > > hijack user control so that they cannot deal with the job result or the > future of > > it. I think this is why we have to add a detach/attach option. > > > > 2. For compilation part, I think it could be a workaround that you upload > those > > resources in a commonly known address such as HDFS so that compilation > > can read from either client or cluster. > > > > Best, > > tison. > > > > [1] > https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14051-3FfocusedCommentId-3D16927430-26page-3Dcom.atlassian.jira.plugin.system.issuetabpanels-253Acomment-2Dtabpanel-23comment-2D16927430&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=yc-Yzv-tHE6HrxNokngJS1rc9d43qyH8bA63kBsSj-Y&s=lZq8trXN1U301YYMxXKDXySRlDfsl8ewJNhDkYEegWw&e=> > > > > > > Newport, Billy <billy.newp...@gs.com> 于2019年10月30日周三 下午10:41写道: > > We execute multiple job graphs routinely because we cannot submit a single > graph without it blowing up. I believe Regina spoke to this in Berlin > during her talk. We instead if we are processing a database ingestion with > 200 tables in it, we do a job graph per table rather than a single job > graph that does all tables instead. A single job graph can be in the tens > of thousands of nodes in our largest cases and we have found flink (as os > 1.3/1.6.4) cannot handle graphs of that size. We’re currently testing 1.9.1 > but have not retested the large graph scenario. > > > > Billy > > > > > > *From:* Paul Lam [mailto:paullin3...@gmail.com] > *Sent:* Wednesday, October 30, 2019 8:41 AM > *To:* SHI Xiaogang > *Cc:* tison; dev; user > *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode > > > > Hi, > > > > Thanks for starting the discussion. > > > > WRT the per-job semantic, it looks natural to me that per-job means > per-job-graph, > > because in my understanding JobGraph is the representation of a job. Could > you > > share some use case in which a user program should contain multiple job > graphs? > > > > WRT the per-program mode, I’m also in flavor of a unified cluster-side > execution > > for user program, so +1 from my side. > > > > But I think there may be some values for the current per-job mode: we now > have > > some common resources available on the client machine that would be read > by main > > methods in user programs. If migrated to per-program mode, we must > explicitly > > set the specific resources for each user program and ship them to the > cluster, > > it would be a bit inconvenient. Also, as the job graph is compiled at the > client, > > we can recognize the errors caused by user code before starting the > cluster > > and easily get access to the logs. > > > > Best, > > Paul Lam > > > > 在 2019年10月30日,16:22,SHI Xiaogang <shixiaoga...@gmail.com> 写道: > > > > Hi > > > > Thanks for bringing this. > > > > The design looks very nice to me in that > > 1. In the new per-job mode, we don't need to compile user programs in the > client and can directly run user programs with user jars. That way, it's > easier for resource isolation in multi-tenant platforms and is much safer. > > 2. The execution of user programs can be unified in session and per-job > modes. In session mode, user jobs are submitted via a remote ClusterClient > while in per-job mode user jobs are submitted via a local ClusterClient. > > > > Regards, > > Xiaogang > > > > tison <wander4...@gmail.com> 于2019年10月30日周三 下午3:30写道: > > (CC user list because I think users may have ideas on how per-job mode > should look like) > > > > Hi all, > > In the discussion about Flink on k8s[1] we encounter a problem that > opinions > diverge in how so-called per-job mode works. This thread is aimed at > stating > a dedicated discussion about per-job semantic and how to implement it. > > **The AS IS per-job mode** > > * in standalone deployment, we bundle user jar with Flink jar, retrieve > JobGraph which is the very first JobGraph from user program in classpath, > and then start a Dispatcher with this JobGraph preconfigured, which > launches it as "recovered" job. > > * in YARN deployment, we accept submission via CliFrontend, extract > JobGraph > which is the very first JobGraph from user program submitted, serialize > the JobGraph and upload it to YARN as resource, and then when AM starts, > retrieve the JobGraph as resource and start Dispatcher with this JobGraph > preconfigured, follows are the same. > > Specifically, in order to support multiple parts job, if YARN deployment > configured as "attached", it starts a SessionCluster, proceeds the progress > and shutdown the cluster on job finished. > > **Motivation** > > The implementation mentioned above, however, suffers from problems. The > major > two of them are 1. only respect the very first JobGraph from user program > 2. > compile job in client side > > 1. Only respect the very first JobGraph from user program > > There is already issue about this topic[2]. As we extract JobGraph from > user > program by hijacking Environment#execute we actually abort any execution > after the first call to #execute. Besides it surprises users many times > that > any logic they write in the program is possibly never executed, here the > problem is that the semantic of "job" from Flink perspective. I'd like to > say > in current implementation "per-job" is actually "per-job-graph". However, > in practices since we support jar submission it is "per-program" semantic > wanted. > > 2. Compile job in client side > > Well, standalone deployment is not in the case. But in YARN deployment, we > compile job and get JobGraph in client side, and then upload it to YARN. > This approach, however, somehow breaks isolation. We have observed that > user > program contains exception handling logic which call System.exit in main > method, which causes a compilation of the job exit the whole client at > once. > It is a critical problem if we manage multiple Flink job in a unique > platform. > In this case, it shut down the whole service. > > Besides there are many times I was asked why per-job mode doesn't run > "just like" session mode but with a dedicated cluster. It might imply that > current implementation mismatches users' demand. > > **Proposal** > > In order to provide a "per-program" semantic mode which acts "just like" > session > mode but with a dedicated cluster, I propose a workflow as below. It acts > like > starting a drive on cluster but is not a general driver solution as > proposed > here[3], the main purpose of the workflow below is for providing a > "per-program" > semantic mode. > > *From CliFrontend* > > 1. CliFrontend receives submission, gathers all configuration and starts a > corresponding ClusterDescriptor. > > 2. ClusterDescriptor deploys a cluster with main class > ProgramClusterEntrypoint > while shipping resources including user program. > > 3. ProgramClusterEntrypoint#main contains logic starting components > including > Standalone Dispatcher, configuring user program to start a > RpcClusterClient, > and then invoking main method of user program. > > 4. RpcClusterClient acts like MiniClusterClient which is able to submit the > JobGraph after leader elected so that we don't fallback to round-robin or > fail submission due to no leader. > > 5. Whether or not deliver job result depends on user program logic, since > we > can already get a JobClient from execute. ProgramClusterEntrypoint exits on > user program exits and all jobs submitted globally terminate. > > This way fits in the direction of FLIP-73 because strategy starting a > RpcClusterClient can be regarded as a special Executor. After > ProgramClusterEntrypoint#main starts a Cluster, it generates and passes > configuration to > user program so that when Executor generated, it knows to use a > RpcClusterClient > for submission and the address of Dispatcher. > > **Compatibility** > > In my opinion this mode can be totally an add-on to current codebase. We > actually don't replace current per-job mode with so-called "per-program" > mode. > It happens that current per-job mode would be useless if we have such > "per-program" mode so that we possibly deprecate it for preferring the > other. > > I'm glad to discuss more into details if you're interested in, but let's > say > we'd better first reach a consensus on the overall design :-) > > Looking forward to your reply! > > Best, > tison. > > [1] https://issues.apache.org/jira/browse/FLINK-9953 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> > [2] https://issues.apache.org/jira/browse/FLINK-10879 > <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> > [3] > https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# > <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> > > > > > ------------------------------ > > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices > > > > ------------------------------ > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices >