Hi all,
we're using a lot the multiple jobs in one program and this is why: when
you fetch data from a huge number of sources and, for each source, you do
some transformation and then you want to write into a single directory the
union of all outputs (this assumes you're doing batch). When the number of
sources is large, if you want to do this in a single job, the graph becomes
very big and this is a problem for several reasons:

   - too many substasks /threadsi per slot
   - increase of back pressure
   - if a single "sub-job" fails all the job fails..this is very annoying
   if this happens after a half a day for example
   - In our use case, the big-graph mode takes much longer than running
   each job separately (but maybe this is true only if you don't have much
   hardware resources)
   - debugging the cause of a fail could become a daunting task if the job
   graph is too large
   - we faced may strange errors when trying to run the single big-job mode
   (due to serialization corruption)

So, summarizing our overall experience with Flink batch is: the easier is
the job graph the better!

Best,
Flavio


On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <danrtsey...@gmail.com> wrote:

> Thanks for tison starting this exciting discussion. We also suffer a lot
> from the per job mode.
> I think the per-job cluster is a dedicated cluster for only one job and
> will not accept more other
> jobs. It has the advantage of one-step submission, do not need to start
> dispatcher first and
> then submit the job. And it does not matter where the job graph is
> generated and job is submitted.
> Now we have two cases.
>
> (1) Current Yarn detached cluster. The job graph is generated in client
> and then use distributed
> cache to flink master container. And the MiniDispatcher uses
> `FileJobGraphRetrieve` to get it.
> The job will be submitted at flink master side.
>
>
> (2) Standalone per job cluster. User jars are already built into image. So
> the job graph will be
> generated at flink master side and `ClasspathJobGraphRetriver` is used to
> get it. The job will
> also be submitted at flink master side.
>
>
> For the (1) and (2), only one job in user program could be supported. The
> per job means
> per job-graph, so it works just as expected.
>
>
> Tison suggests to add a new mode "per-program”. The user jar will be
> transferred to flink master
> container, and a local client will be started to generate job graph and
> submit job. I think it could
> cover all the functionality of current per job, both (1) and (2). Also the
> detach mode and attach
> mode could be unified. We do not need to start a session cluster to
> simulate per job for multiple parts.
>
>
> I am in favor of the “per-program” mode. Just two concerns.
> 1. How many users are using multiple jobs in one program?
> 2. Why do not always use session cluster to simulate per job? Maybe
> one-step submission
> is a convincing reason.
>
>
>
> Best,
> Yang
>
> tison <wander4...@gmail.com> 于2019年10月31日周四 上午9:18写道:
>
>> Thanks for your attentions!
>>
>> @shixiaoga...@gmail.com <shixiaoga...@gmail.com>
>>
>> Yes correct. We try to avoid jobs affect one another. Also a local
>> ClusterClient
>> in case saves the overhead about retry before leader elected and persist
>> JobGraph before submission in RestClusterClient as well as the net cost.
>>
>> @Paul Lam <paullin3...@gmail.com>
>>
>> 1. Here is already a note[1] about multiple part jobs. I am also confused
>> a bit
>> on this concept at first :-) Things go in similar way if you program
>> contains the
>> only JobGraph so that I think per-program acts like per-job-graph in this
>> case
>> which provides compatibility for many of one job graph program.
>>
>> Besides, we have to respect user program which doesn't with current
>> implementation because we return abruptly when calling env#execute which
>> hijack user control so that they cannot deal with the job result or the
>> future of
>> it. I think this is why we have to add a detach/attach option.
>>
>> 2. For compilation part, I think it could be a workaround that you upload
>> those
>> resources in a commonly known address such as HDFS so that compilation
>> can read from either client or cluster.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
>>
>>
>> Newport, Billy <billy.newp...@gs.com> 于2019年10月30日周三 下午10:41写道:
>>
>>> We execute multiple job graphs routinely because we cannot submit a
>>> single graph without it blowing up. I believe Regina spoke to this in
>>> Berlin during her talk. We instead if we are processing a database
>>> ingestion with 200 tables in it, we do a job graph per table rather than a
>>> single job graph that does all tables instead. A single job graph can be in
>>> the tens of thousands of nodes in our largest cases and we have found flink
>>> (as os 1.3/1.6.4) cannot handle graphs of that size. We’re currently
>>> testing 1.9.1 but have not retested the large graph scenario.
>>>
>>>
>>>
>>> Billy
>>>
>>>
>>>
>>>
>>>
>>> *From:* Paul Lam [mailto:paullin3...@gmail.com]
>>> *Sent:* Wednesday, October 30, 2019 8:41 AM
>>> *To:* SHI Xiaogang
>>> *Cc:* tison; dev; user
>>> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> Thanks for starting the discussion.
>>>
>>>
>>>
>>> WRT the per-job semantic, it looks natural to me that per-job means
>>> per-job-graph,
>>>
>>> because in my understanding JobGraph is the representation of a job.
>>> Could you
>>>
>>> share some use case in which a user program should contain multiple job
>>> graphs?
>>>
>>>
>>>
>>> WRT the per-program mode, I’m also in flavor of a unified cluster-side
>>> execution
>>>
>>> for user program, so +1 from my side.
>>>
>>>
>>>
>>> But I think there may be some values for the current per-job mode: we
>>> now have
>>>
>>> some common resources available on the client machine that would be read
>>> by main
>>>
>>> methods in user programs. If migrated to per-program mode, we must
>>> explicitly
>>>
>>> set the specific resources for each user program and ship them to the
>>> cluster,
>>>
>>> it would be a bit inconvenient.  Also, as the job graph is compiled at
>>> the client,
>>>
>>> we can recognize the errors caused by user code before starting the
>>> cluster
>>>
>>> and easily get access to the logs.
>>>
>>>
>>>
>>> Best,
>>>
>>> Paul Lam
>>>
>>>
>>>
>>> 在 2019年10月30日,16:22,SHI Xiaogang <shixiaoga...@gmail.com> 写道:
>>>
>>>
>>>
>>> Hi
>>>
>>>
>>>
>>> Thanks for bringing this.
>>>
>>>
>>>
>>> The design looks very nice to me in that
>>>
>>> 1. In the new per-job mode, we don't need to compile user programs in
>>> the client and can directly run user programs with user jars. That way,
>>> it's easier for resource isolation in multi-tenant platforms and is much
>>> safer.
>>>
>>> 2. The execution of user programs can be unified in session and per-job
>>> modes. In session mode, user jobs are submitted via a remote ClusterClient
>>> while in per-job mode user jobs are submitted via a local ClusterClient.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Xiaogang
>>>
>>>
>>>
>>> tison <wander4...@gmail.com> 于2019年10月30日周三 下午3:30写道:
>>>
>>> (CC user list because I think users may have ideas on how per-job mode
>>> should look like)
>>>
>>>
>>>
>>> Hi all,
>>>
>>> In the discussion about Flink on k8s[1] we encounter a problem that
>>> opinions
>>> diverge in how so-called per-job mode works. This thread is aimed at
>>> stating
>>> a dedicated discussion about per-job semantic and how to implement it.
>>>
>>> **The AS IS per-job mode**
>>>
>>> * in standalone deployment, we bundle user jar with Flink jar, retrieve
>>> JobGraph which is the very first JobGraph from user program in classpath,
>>> and then start a Dispatcher with this JobGraph preconfigured, which
>>> launches it as "recovered" job.
>>>
>>> * in YARN deployment, we accept submission via CliFrontend, extract
>>> JobGraph
>>> which is the very first JobGraph from user program submitted, serialize
>>> the JobGraph and upload it to YARN as resource, and then when AM starts,
>>> retrieve the JobGraph as resource and start Dispatcher with this JobGraph
>>> preconfigured, follows are the same.
>>>
>>> Specifically, in order to support multiple parts job, if YARN deployment
>>> configured as "attached", it starts a SessionCluster, proceeds the
>>> progress
>>> and shutdown the cluster on job finished.
>>>
>>> **Motivation**
>>>
>>> The implementation mentioned above, however, suffers from problems. The
>>> major
>>> two of them are 1. only respect the very first JobGraph from user
>>> program 2.
>>> compile job in client side
>>>
>>> 1. Only respect the very first JobGraph from user program
>>>
>>> There is already issue about this topic[2]. As we extract JobGraph from
>>> user
>>> program by hijacking Environment#execute we actually abort any execution
>>> after the first call to #execute. Besides it surprises users many times
>>> that
>>> any logic they write in the program is possibly never executed, here the
>>> problem is that the semantic of "job" from Flink perspective. I'd like
>>> to say
>>> in current implementation "per-job" is actually "per-job-graph". However,
>>> in practices since we support jar submission it is "per-program" semantic
>>> wanted.
>>>
>>> 2. Compile job in client side
>>>
>>> Well, standalone deployment is not in the case. But in YARN deployment,
>>> we
>>> compile job and get JobGraph in client side, and then upload it to YARN.
>>> This approach, however, somehow breaks isolation. We have observed that
>>> user
>>> program contains exception handling logic which call System.exit in main
>>> method, which causes a compilation of the job exit the whole client at
>>> once.
>>> It is a critical problem if we manage multiple Flink job in a unique
>>> platform.
>>> In this case, it shut down the whole service.
>>>
>>> Besides there are many times I was asked why per-job mode doesn't run
>>> "just like" session mode but with a dedicated cluster. It might imply
>>> that
>>> current implementation mismatches users' demand.
>>>
>>> **Proposal**
>>>
>>> In order to provide a "per-program" semantic mode which acts "just like"
>>> session
>>> mode but with a dedicated cluster, I propose a workflow as below. It
>>> acts like
>>> starting a drive on cluster but is not a general driver solution as
>>> proposed
>>> here[3], the main purpose of the workflow below is for providing a
>>> "per-program"
>>> semantic mode.
>>>
>>> *From CliFrontend*
>>>
>>> 1. CliFrontend receives submission, gathers all configuration and starts
>>> a
>>> corresponding ClusterDescriptor.
>>>
>>> 2. ClusterDescriptor deploys a cluster with main class
>>> ProgramClusterEntrypoint
>>> while shipping resources including user program.
>>>
>>> 3. ProgramClusterEntrypoint#main contains logic starting components
>>> including
>>> Standalone Dispatcher, configuring user program to start a
>>> RpcClusterClient,
>>> and then invoking main method of user program.
>>>
>>> 4. RpcClusterClient acts like MiniClusterClient which is able to submit
>>> the
>>> JobGraph after leader elected so that we don't fallback to round-robin or
>>> fail submission due to no leader.
>>>
>>> 5. Whether or not deliver job result depends on user program logic,
>>> since we
>>> can already get a JobClient from execute. ProgramClusterEntrypoint exits
>>> on
>>> user program exits and all jobs submitted globally terminate.
>>>
>>> This way fits in the direction of FLIP-73 because strategy starting a
>>> RpcClusterClient can be regarded as a special Executor. After
>>> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes
>>> configuration to
>>> user program so that when Executor generated, it knows to use a
>>> RpcClusterClient
>>> for submission and the address of Dispatcher.
>>>
>>> **Compatibility**
>>>
>>> In my opinion this mode can be totally an add-on to current codebase. We
>>> actually don't replace current per-job mode with so-called "per-program"
>>> mode.
>>> It happens that current per-job mode would be useless if we have such
>>> "per-program" mode so that we possibly deprecate it for preferring the
>>> other.
>>>
>>> I'm glad to discuss more into details if you're interested in, but let's
>>> say
>>> we'd better first reach a consensus on the overall design :-)
>>>
>>> Looking forward to your reply!
>>>
>>> Best,
>>> tison.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9953
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=>
>>> [2] https://issues.apache.org/jira/browse/FLINK-10879
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=>
>>> [3]
>>> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#
>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=>
>>>
>>>
>>>
>>> ------------------------------
>>>
>>> Your Personal Data: We may collect and process information about you
>>> that may be subject to data protection laws. For more information about how
>>> we use and disclose your personal data, how we protect your information,
>>> our legal basis to use your information, your rights and who you can
>>> contact, please refer to: www.gs.com/privacy-notices
>>>
>>

Reply via email to