Hi, We are using HA mode. Looks like multiple jobs is not an option for us.... That makes sense! Thanks for your guys' help!
Thanks, Qihua On Wed, Jun 23, 2021 at 7:28 PM Yang Wang <danrtsey...@gmail.com> wrote: > Robert is right. We Could only support single job submission in > application mode when the HA mode is enabled. > > This is a known limitation of current application mode implementation. > > Best, > Yang > > Robert Metzger <rmetz...@apache.org> 于2021年6月24日周四 上午3:54写道: > >> Thanks a lot for checking again. I just started Flink in Application mode >> with a jar that contains two "executeAsync" submissions, and indeed two >> jobs are running. >> >> I think the problem in your case is that you are using High Availability >> (I guess, because there are log statements from the >> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]: >> >> The Application Mode allows for multi-execute() applications but >>> High-Availability is not supported in these cases. High-Availability in >>> Application Mode is only supported for single-execute() applications. >> >> >> See also: https://issues.apache.org/jira/browse/FLINK-19358 >> >> Sorry again that I gave you invalid information in my first answer. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/ >> >> >> >> >> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang <yang...@gmail.com> wrote: >> >>> Hi Robert, >>> >>> But I saw Flink doc shows application mode can run multiple jobs? Or I >>> misunderstand it? >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/ >>> >>> >>> >>> *Compared to the Per-Job mode, the Application Mode allows the submission >>> of applications consisting of multiple jobs. The order of job execution is >>> not affected by the deployment mode but by the call used to launch the job. >>> Using execute(), which is blocking, establishes an order and it will lead >>> to the execution of the "next" job being postponed until "this" job >>> finishes. Using executeAsync(), which is non-blocking, will lead to the >>> "next" job starting before "this" job finishes.* >>> >>> >>> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Qihua, >>>> >>>> Application Mode is meant for executing one job at a time, not multiple >>>> jobs on the same JobManager. >>>> If you want to do that, you need to use session mode, which allows >>>> managing multiple jobs on the same JobManager. >>>> >>>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <yang...@gmail.com> wrote: >>>> >>>>> Hi Arvid, >>>>> >>>>> Do you know if I can start multiple jobs for a single flink >>>>> application? >>>>> >>>>> Thanks, >>>>> Qihua >>>>> >>>>> >>>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am using application mode. >>>>>> >>>>>> Thanks, >>>>>> Qihua >>>>>> >>>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Qihua, >>>>>>> >>>>>>> Which execution mode are you using? >>>>>>> >>>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thank you for your reply. What I want is flink app has multiple >>>>>>>> jobs, each job manage a stream. Currently our flink app has only 1 job >>>>>>>> that >>>>>>>> manage multiple streams. >>>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the >>>>>>>> log, when the second executeAsync() was called, it shows " *Job >>>>>>>> 00000000000000000000000000000000 was recovered successfully.*" >>>>>>>> Looks like the second executeAsync() recover the first job. Not >>>>>>>> start a second job. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Qihua >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> env.execute("Job 1"); is a blocking call. You either have to use >>>>>>>>> executeAsync or use a separate thread to submit the second job. If >>>>>>>>> Job 1 >>>>>>>>> finishes then this would also work by having sequential execution. >>>>>>>>> >>>>>>>>> However, I think what you actually want to do is to use the same >>>>>>>>> env with 2 topologies and 1 single execute like this. >>>>>>>>> >>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>> DataStream<String> stream1 = env.addSource(new >>>>>>>>> SourceFunction<String>()); >>>>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>>>> DataStream<String> stream2 = env.addSource(new >>>>>>>>> SourceFunction<String>()); >>>>>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>>>>> env.execute("Job 1+2"); >>>>>>>>> >>>>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> Does anyone know how to run multiple jobs in same flink >>>>>>>>>> application? >>>>>>>>>> I did a simple test. First job was started. I did see the log >>>>>>>>>> message, but I didn't see the second job was started, even I saw the >>>>>>>>>> log >>>>>>>>>> message. >>>>>>>>>> >>>>>>>>>> public void testJobs() throws Exception { >>>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> DataStream<String> stream1 = env.addSource(new >>>>>>>>>> SourceFunction<String>()); >>>>>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>>>>> printf("### first job"); >>>>>>>>>> env.execute("Job 1"); >>>>>>>>>> >>>>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>>> DataStream<String> stream2 = env.addSource(new >>>>>>>>>> SourceFunction<String>()); >>>>>>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>>>>>> printf("### second job"); >>>>>>>>>> env.execute("Job 2"); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Here is the log: >>>>>>>>>> ### first job >>>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor >>>>>>>>>> - Job 00000000000000000000000000000000 is submitted. >>>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor >>>>>>>>>> - Submitting Job with JobId=00000000000000000000000000000000. >>>>>>>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1). >>>>>>>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>>>>>> Submitting job 00000000000000000000000000000000 (job1). >>>>>>>>>> >>>>>>>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>>>>>>> Starting ZooKeeperLeaderRetrievalService >>>>>>>>>> /leader/resource_manager_lock. >>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>>>>>>> execution of job job1 (00000000000000000000000000000000) under job >>>>>>>>>> master >>>>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07. >>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>>>>>>> scheduling with scheduling strategy >>>>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>>>>>>>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>>>>>>>> Job job1 (00000000000000000000000000000000) switched from state >>>>>>>>>> CREATED to >>>>>>>>>> RUNNING. >>>>>>>>>> >>>>>>>>>> ### second job >>>>>>>>>> WARN com.doordash.flink.common.utils.LoggerUtil - Class - >>>>>>>>>> IndexWriter : ### second job >>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >>>>>>>>>> ResourceManager address, beginning registration >>>>>>>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>>>>>>> Starting ZooKeeperLeaderRetrievalService >>>>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock. >>>>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>>>> - Registering job manager b03cde9dc2aebdb39c46cda4c2a94c07 >>>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for >>>>>>>>>> job 00000000000000000000000000000000. >>>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor >>>>>>>>>> - Job 00000000000000000000000000000000 was recovered successfully. >>>>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>>>> - Registered job manager b03cde9dc2aebdb39c46cda4c2a94c07 >>>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for >>>>>>>>>> job 00000000000000000000000000000000. >>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >>>>>>>>>> successfully registered at ResourceManager, leader id: >>>>>>>>>> 956d4431ca90d45d92c027046cd0404e. >>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>>>>>> Requesting new slot >>>>>>>>>> [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and >>>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>>>> - Request slot with profile ResourceProfile{UNKNOWN} for job >>>>>>>>>> 00000000000000000000000000000000 with allocation id >>>>>>>>>> 21134414fc60d4ef3e940609cef960b6. >>>>>>>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>>>>>> Requesting new slot >>>>>>>>>> [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and >>>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>>>> - Request slot with profile ResourceProfile{UNKNOWN} for job >>>>>>>>>> 00000000000000000000000000000000 with allocation id >>>>>>>>>> 650bd9100a35ef5086fd4614f5253b55. >>>>>>>>>> >>>>>>>>>