Hi all, we're using a lot the multiple jobs in one program and this is why: when you fetch data from a huge number of sources and, for each source, you do some transformation and then you want to write into a single directory the union of all outputs (this assumes you're doing batch). When the number of sources is large, if you want to do this in a single job, the graph becomes very big and this is a problem for several reasons:
- too many substasks /threadsi per slot - increase of back pressure - if a single "sub-job" fails all the job fails..this is very annoying if this happens after a half a day for example - In our use case, the big-graph mode takes much longer than running each job separately (but maybe this is true only if you don't have much hardware resources) - debugging the cause of a fail could become a daunting task if the job graph is too large - we faced may strange errors when trying to run the single big-job mode (due to serialization corruption) So, summarizing our overall experience with Flink batch is: the easier is the job graph the better! Best, Flavio On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <danrtsey...@gmail.com> wrote: > Thanks for tison starting this exciting discussion. We also suffer a lot > from the per job mode. > I think the per-job cluster is a dedicated cluster for only one job and > will not accept more other > jobs. It has the advantage of one-step submission, do not need to start > dispatcher first and > then submit the job. And it does not matter where the job graph is > generated and job is submitted. > Now we have two cases. > > (1) Current Yarn detached cluster. The job graph is generated in client > and then use distributed > cache to flink master container. And the MiniDispatcher uses > `FileJobGraphRetrieve` to get it. > The job will be submitted at flink master side. > > > (2) Standalone per job cluster. User jars are already built into image. So > the job graph will be > generated at flink master side and `ClasspathJobGraphRetriver` is used to > get it. The job will > also be submitted at flink master side. > > > For the (1) and (2), only one job in user program could be supported. The > per job means > per job-graph, so it works just as expected. > > > Tison suggests to add a new mode "per-program”. The user jar will be > transferred to flink master > container, and a local client will be started to generate job graph and > submit job. I think it could > cover all the functionality of current per job, both (1) and (2). Also the > detach mode and attach > mode could be unified. We do not need to start a session cluster to > simulate per job for multiple parts. > > > I am in favor of the “per-program” mode. Just two concerns. > 1. How many users are using multiple jobs in one program? > 2. Why do not always use session cluster to simulate per job? Maybe > one-step submission > is a convincing reason. > > > > Best, > Yang > > tison <wander4...@gmail.com> 于2019年10月31日周四 上午9:18写道: > >> Thanks for your attentions! >> >> @shixiaoga...@gmail.com <shixiaoga...@gmail.com> >> >> Yes correct. We try to avoid jobs affect one another. Also a local >> ClusterClient >> in case saves the overhead about retry before leader elected and persist >> JobGraph before submission in RestClusterClient as well as the net cost. >> >> @Paul Lam <paullin3...@gmail.com> >> >> 1. Here is already a note[1] about multiple part jobs. I am also confused >> a bit >> on this concept at first :-) Things go in similar way if you program >> contains the >> only JobGraph so that I think per-program acts like per-job-graph in this >> case >> which provides compatibility for many of one job graph program. >> >> Besides, we have to respect user program which doesn't with current >> implementation because we return abruptly when calling env#execute which >> hijack user control so that they cannot deal with the job result or the >> future of >> it. I think this is why we have to add a detach/attach option. >> >> 2. For compilation part, I think it could be a workaround that you upload >> those >> resources in a commonly known address such as HDFS so that compilation >> can read from either client or cluster. >> >> Best, >> tison. >> >> [1] >> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430 >> >> >> Newport, Billy <billy.newp...@gs.com> 于2019年10月30日周三 下午10:41写道: >> >>> We execute multiple job graphs routinely because we cannot submit a >>> single graph without it blowing up. I believe Regina spoke to this in >>> Berlin during her talk. We instead if we are processing a database >>> ingestion with 200 tables in it, we do a job graph per table rather than a >>> single job graph that does all tables instead. A single job graph can be in >>> the tens of thousands of nodes in our largest cases and we have found flink >>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently >>> testing 1.9.1 but have not retested the large graph scenario. >>> >>> >>> >>> Billy >>> >>> >>> >>> >>> >>> *From:* Paul Lam [mailto:paullin3...@gmail.com] >>> *Sent:* Wednesday, October 30, 2019 8:41 AM >>> *To:* SHI Xiaogang >>> *Cc:* tison; dev; user >>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode >>> >>> >>> >>> Hi, >>> >>> >>> >>> Thanks for starting the discussion. >>> >>> >>> >>> WRT the per-job semantic, it looks natural to me that per-job means >>> per-job-graph, >>> >>> because in my understanding JobGraph is the representation of a job. >>> Could you >>> >>> share some use case in which a user program should contain multiple job >>> graphs? >>> >>> >>> >>> WRT the per-program mode, I’m also in flavor of a unified cluster-side >>> execution >>> >>> for user program, so +1 from my side. >>> >>> >>> >>> But I think there may be some values for the current per-job mode: we >>> now have >>> >>> some common resources available on the client machine that would be read >>> by main >>> >>> methods in user programs. If migrated to per-program mode, we must >>> explicitly >>> >>> set the specific resources for each user program and ship them to the >>> cluster, >>> >>> it would be a bit inconvenient. Also, as the job graph is compiled at >>> the client, >>> >>> we can recognize the errors caused by user code before starting the >>> cluster >>> >>> and easily get access to the logs. >>> >>> >>> >>> Best, >>> >>> Paul Lam >>> >>> >>> >>> 在 2019年10月30日,16:22,SHI Xiaogang <shixiaoga...@gmail.com> 写道: >>> >>> >>> >>> Hi >>> >>> >>> >>> Thanks for bringing this. >>> >>> >>> >>> The design looks very nice to me in that >>> >>> 1. In the new per-job mode, we don't need to compile user programs in >>> the client and can directly run user programs with user jars. That way, >>> it's easier for resource isolation in multi-tenant platforms and is much >>> safer. >>> >>> 2. The execution of user programs can be unified in session and per-job >>> modes. In session mode, user jobs are submitted via a remote ClusterClient >>> while in per-job mode user jobs are submitted via a local ClusterClient. >>> >>> >>> >>> Regards, >>> >>> Xiaogang >>> >>> >>> >>> tison <wander4...@gmail.com> 于2019年10月30日周三 下午3:30写道: >>> >>> (CC user list because I think users may have ideas on how per-job mode >>> should look like) >>> >>> >>> >>> Hi all, >>> >>> In the discussion about Flink on k8s[1] we encounter a problem that >>> opinions >>> diverge in how so-called per-job mode works. This thread is aimed at >>> stating >>> a dedicated discussion about per-job semantic and how to implement it. >>> >>> **The AS IS per-job mode** >>> >>> * in standalone deployment, we bundle user jar with Flink jar, retrieve >>> JobGraph which is the very first JobGraph from user program in classpath, >>> and then start a Dispatcher with this JobGraph preconfigured, which >>> launches it as "recovered" job. >>> >>> * in YARN deployment, we accept submission via CliFrontend, extract >>> JobGraph >>> which is the very first JobGraph from user program submitted, serialize >>> the JobGraph and upload it to YARN as resource, and then when AM starts, >>> retrieve the JobGraph as resource and start Dispatcher with this JobGraph >>> preconfigured, follows are the same. >>> >>> Specifically, in order to support multiple parts job, if YARN deployment >>> configured as "attached", it starts a SessionCluster, proceeds the >>> progress >>> and shutdown the cluster on job finished. >>> >>> **Motivation** >>> >>> The implementation mentioned above, however, suffers from problems. The >>> major >>> two of them are 1. only respect the very first JobGraph from user >>> program 2. >>> compile job in client side >>> >>> 1. Only respect the very first JobGraph from user program >>> >>> There is already issue about this topic[2]. As we extract JobGraph from >>> user >>> program by hijacking Environment#execute we actually abort any execution >>> after the first call to #execute. Besides it surprises users many times >>> that >>> any logic they write in the program is possibly never executed, here the >>> problem is that the semantic of "job" from Flink perspective. I'd like >>> to say >>> in current implementation "per-job" is actually "per-job-graph". However, >>> in practices since we support jar submission it is "per-program" semantic >>> wanted. >>> >>> 2. Compile job in client side >>> >>> Well, standalone deployment is not in the case. But in YARN deployment, >>> we >>> compile job and get JobGraph in client side, and then upload it to YARN. >>> This approach, however, somehow breaks isolation. We have observed that >>> user >>> program contains exception handling logic which call System.exit in main >>> method, which causes a compilation of the job exit the whole client at >>> once. >>> It is a critical problem if we manage multiple Flink job in a unique >>> platform. >>> In this case, it shut down the whole service. >>> >>> Besides there are many times I was asked why per-job mode doesn't run >>> "just like" session mode but with a dedicated cluster. It might imply >>> that >>> current implementation mismatches users' demand. >>> >>> **Proposal** >>> >>> In order to provide a "per-program" semantic mode which acts "just like" >>> session >>> mode but with a dedicated cluster, I propose a workflow as below. It >>> acts like >>> starting a drive on cluster but is not a general driver solution as >>> proposed >>> here[3], the main purpose of the workflow below is for providing a >>> "per-program" >>> semantic mode. >>> >>> *From CliFrontend* >>> >>> 1. CliFrontend receives submission, gathers all configuration and starts >>> a >>> corresponding ClusterDescriptor. >>> >>> 2. ClusterDescriptor deploys a cluster with main class >>> ProgramClusterEntrypoint >>> while shipping resources including user program. >>> >>> 3. ProgramClusterEntrypoint#main contains logic starting components >>> including >>> Standalone Dispatcher, configuring user program to start a >>> RpcClusterClient, >>> and then invoking main method of user program. >>> >>> 4. RpcClusterClient acts like MiniClusterClient which is able to submit >>> the >>> JobGraph after leader elected so that we don't fallback to round-robin or >>> fail submission due to no leader. >>> >>> 5. Whether or not deliver job result depends on user program logic, >>> since we >>> can already get a JobClient from execute. ProgramClusterEntrypoint exits >>> on >>> user program exits and all jobs submitted globally terminate. >>> >>> This way fits in the direction of FLIP-73 because strategy starting a >>> RpcClusterClient can be regarded as a special Executor. After >>> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes >>> configuration to >>> user program so that when Executor generated, it knows to use a >>> RpcClusterClient >>> for submission and the address of Dispatcher. >>> >>> **Compatibility** >>> >>> In my opinion this mode can be totally an add-on to current codebase. We >>> actually don't replace current per-job mode with so-called "per-program" >>> mode. >>> It happens that current per-job mode would be useless if we have such >>> "per-program" mode so that we possibly deprecate it for preferring the >>> other. >>> >>> I'm glad to discuss more into details if you're interested in, but let's >>> say >>> we'd better first reach a consensus on the overall design :-) >>> >>> Looking forward to your reply! >>> >>> Best, >>> tison. >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-9953 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=> >>> [2] https://issues.apache.org/jira/browse/FLINK-10879 >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=> >>> [3] >>> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit# >>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=> >>> >>> >>> >>> ------------------------------ >>> >>> Your Personal Data: We may collect and process information about you >>> that may be subject to data protection laws. For more information about how >>> we use and disclose your personal data, how we protect your information, >>> our legal basis to use your information, your rights and who you can >>> contact, please refer to: www.gs.com/privacy-notices >>> >>