It is a big challenge for us if we want to support multiple jobs in the same application. For example, 1. If some jobs have finished, we should not run them again when the JobManager failover. It means we need to store the finished jobs in the HA services. 2. Some jobs are only run in specific conditions, it will make the recovery more complicated.
if (xxxx) { env.executeAsync("job1"); } env.executeAsync("job2"); Best, Yang Qihua Yang <yang...@gmail.com> 于2021年6月25日周五 上午5:13写道: > Hi, > > We are using HA mode. Looks like multiple jobs is not an option for us.... > That makes sense! Thanks for your guys' help! > > Thanks, > Qihua > > > On Wed, Jun 23, 2021 at 7:28 PM Yang Wang <danrtsey...@gmail.com> wrote: > >> Robert is right. We Could only support single job submission in >> application mode when the HA mode is enabled. >> >> This is a known limitation of current application mode implementation. >> >> Best, >> Yang >> >> Robert Metzger <rmetz...@apache.org> 于2021年6月24日周四 上午3:54写道: >> >>> Thanks a lot for checking again. I just started Flink in Application >>> mode with a jar that contains two "executeAsync" submissions, and indeed >>> two jobs are running. >>> >>> I think the problem in your case is that you are using High Availability >>> (I guess, because there are log statements from the >>> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]: >>> >>> The Application Mode allows for multi-execute() applications but >>>> High-Availability is not supported in these cases. High-Availability in >>>> Application Mode is only supported for single-execute() applications. >>> >>> >>> See also: https://issues.apache.org/jira/browse/FLINK-19358 >>> >>> Sorry again that I gave you invalid information in my first answer. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/ >>> >>> >>> >>> >>> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang <yang...@gmail.com> wrote: >>> >>>> Hi Robert, >>>> >>>> But I saw Flink doc shows application mode can run multiple jobs? Or I >>>> misunderstand it? >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/ >>>> >>>> >>>> >>>> *Compared to the Per-Job mode, the Application Mode allows the submission >>>> of applications consisting of multiple jobs. The order of job execution is >>>> not affected by the deployment mode but by the call used to launch the >>>> job. Using execute(), which is blocking, establishes an order and it will >>>> lead to the execution of the "next" job being postponed until "this" job >>>> finishes. Using executeAsync(), which is non-blocking, will lead to the >>>> "next" job starting before "this" job finishes.* >>>> >>>> >>>> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> >>>>> Hi Qihua, >>>>> >>>>> Application Mode is meant for executing one job at a time, not >>>>> multiple jobs on the same JobManager. >>>>> If you want to do that, you need to use session mode, which allows >>>>> managing multiple jobs on the same JobManager. >>>>> >>>>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <yang...@gmail.com> wrote: >>>>> >>>>>> Hi Arvid, >>>>>> >>>>>> Do you know if I can start multiple jobs for a single flink >>>>>> application? >>>>>> >>>>>> Thanks, >>>>>> Qihua >>>>>> >>>>>> >>>>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am using application mode. >>>>>>> >>>>>>> Thanks, >>>>>>> Qihua >>>>>>> >>>>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Qihua, >>>>>>>> >>>>>>>> Which execution mode are you using? >>>>>>>> >>>>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Thank you for your reply. What I want is flink app has multiple >>>>>>>>> jobs, each job manage a stream. Currently our flink app has only 1 >>>>>>>>> job that >>>>>>>>> manage multiple streams. >>>>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the >>>>>>>>> log, when the second executeAsync() was called, it shows " *Job >>>>>>>>> 00000000000000000000000000000000 was recovered successfully.*" >>>>>>>>> Looks like the second executeAsync() recover the first job. Not >>>>>>>>> start a second job. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Qihua >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> env.execute("Job 1"); is a blocking call. You either have to use >>>>>>>>>> executeAsync or use a separate thread to submit the second job. If >>>>>>>>>> Job 1 >>>>>>>>>> finishes then this would also work by having sequential execution. >>>>>>>>>> >>>>>>>>>> However, I think what you actually want to do is to use the same >>>>>>>>>> env with 2 topologies and 1 single execute like this. >>>>>>>>>> >>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> DataStream<String> stream1 = env.addSource(new >>>>>>>>>> SourceFunction<String>()); >>>>>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>>>>> DataStream<String> stream2 = env.addSource(new >>>>>>>>>> SourceFunction<String>()); >>>>>>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>>>>>> env.execute("Job 1+2"); >>>>>>>>>> >>>>>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> Does anyone know how to run multiple jobs in same flink >>>>>>>>>>> application? >>>>>>>>>>> I did a simple test. First job was started. I did see the log >>>>>>>>>>> message, but I didn't see the second job was started, even I saw >>>>>>>>>>> the log >>>>>>>>>>> message. >>>>>>>>>>> >>>>>>>>>>> public void testJobs() throws Exception { >>>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>> DataStream<String> stream1 = env.addSource(new >>>>>>>>>>> SourceFunction<String>()); >>>>>>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>>>>>> printf("### first job"); >>>>>>>>>>> env.execute("Job 1"); >>>>>>>>>>> >>>>>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>>> DataStream<String> stream2 = env.addSource(new >>>>>>>>>>> SourceFunction<String>()); >>>>>>>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>>>>>>> printf("### second job"); >>>>>>>>>>> env.execute("Job 2"); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> Here is the log: >>>>>>>>>>> ### first job >>>>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor >>>>>>>>>>> - Job 00000000000000000000000000000000 is submitted. >>>>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor >>>>>>>>>>> - Submitting Job with JobId=00000000000000000000000000000000. >>>>>>>>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher >>>>>>>>>>> - Received JobGraph submission 00000000000000000000000000000000 >>>>>>>>>>> (job1). >>>>>>>>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher >>>>>>>>>>> - Submitting job 00000000000000000000000000000000 (job1). >>>>>>>>>>> >>>>>>>>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>>>>>>>> Starting ZooKeeperLeaderRetrievalService >>>>>>>>>>> /leader/resource_manager_lock. >>>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>>>>>>>> execution of job job1 (00000000000000000000000000000000) under job >>>>>>>>>>> master >>>>>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07. >>>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>>>>>>>> scheduling with scheduling strategy >>>>>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>>>>>>>>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>>>>>>>>> Job job1 (00000000000000000000000000000000) switched from state >>>>>>>>>>> CREATED to >>>>>>>>>>> RUNNING. >>>>>>>>>>> >>>>>>>>>>> ### second job >>>>>>>>>>> WARN com.doordash.flink.common.utils.LoggerUtil - Class - >>>>>>>>>>> IndexWriter : ### second job >>>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >>>>>>>>>>> ResourceManager address, beginning registration >>>>>>>>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>>>>>>>> Starting ZooKeeperLeaderRetrievalService >>>>>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock. >>>>>>>>>>> INFO >>>>>>>>>>> o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>>>>>> Registering >>>>>>>>>>> job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >>>>>>>>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>>>>>>>>> 00000000000000000000000000000000. >>>>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor >>>>>>>>>>> - Job 00000000000000000000000000000000 was recovered successfully. >>>>>>>>>>> INFO >>>>>>>>>>> o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>>>>>> Registered >>>>>>>>>>> job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >>>>>>>>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>>>>>>>>> 00000000000000000000000000000000. >>>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >>>>>>>>>>> successfully registered at ResourceManager, leader id: >>>>>>>>>>> 956d4431ca90d45d92c027046cd0404e. >>>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl >>>>>>>>>>> - Requesting new slot >>>>>>>>>>> [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] >>>>>>>>>>> and profile ResourceProfile{UNKNOWN} from resource manager. >>>>>>>>>>> INFO >>>>>>>>>>> o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>>>>>> Request >>>>>>>>>>> slot with profile ResourceProfile{UNKNOWN} for job >>>>>>>>>>> 00000000000000000000000000000000 with allocation id >>>>>>>>>>> 21134414fc60d4ef3e940609cef960b6. >>>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl >>>>>>>>>>> - Requesting new slot >>>>>>>>>>> [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] >>>>>>>>>>> and profile ResourceProfile{UNKNOWN} from resource manager. >>>>>>>>>>> INFO >>>>>>>>>>> o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>>>>>>> Request >>>>>>>>>>> slot with profile ResourceProfile{UNKNOWN} for job >>>>>>>>>>> 00000000000000000000000000000000 with allocation id >>>>>>>>>>> 650bd9100a35ef5086fd4614f5253b55. >>>>>>>>>>> >>>>>>>>>>