Thanks a lot for checking again. I just started Flink in Application mode with a jar that contains two "executeAsync" submissions, and indeed two jobs are running.
I think the problem in your case is that you are using High Availability (I guess, because there are log statements from the ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]: The Application Mode allows for multi-execute() applications but > High-Availability is not supported in these cases. High-Availability in > Application Mode is only supported for single-execute() applications. See also: https://issues.apache.org/jira/browse/FLINK-19358 Sorry again that I gave you invalid information in my first answer. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/ On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang <yang...@gmail.com> wrote: > Hi Robert, > > But I saw Flink doc shows application mode can run multiple jobs? Or I > misunderstand it? > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/ > > > > *Compared to the Per-Job mode, the Application Mode allows the submission of > applications consisting of multiple jobs. The order of job execution is not > affected by the deployment mode but by the call used to launch the job. Using > execute(), which is blocking, establishes an order and it will lead to the > execution of the "next" job being postponed until "this" job finishes. Using > executeAsync(), which is non-blocking, will lead to the "next" job starting > before "this" job finishes.* > > > On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Qihua, >> >> Application Mode is meant for executing one job at a time, not multiple >> jobs on the same JobManager. >> If you want to do that, you need to use session mode, which allows >> managing multiple jobs on the same JobManager. >> >> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <yang...@gmail.com> wrote: >> >>> Hi Arvid, >>> >>> Do you know if I can start multiple jobs for a single flink application? >>> >>> Thanks, >>> Qihua >>> >>> >>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am using application mode. >>>> >>>> Thanks, >>>> Qihua >>>> >>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> Hi Qihua, >>>>> >>>>> Which execution mode are you using? >>>>> >>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thank you for your reply. What I want is flink app has multiple jobs, >>>>>> each job manage a stream. Currently our flink app has only 1 job that >>>>>> manage multiple streams. >>>>>> I did try env.executeAsyc(), but it still doesn't work. From the log, >>>>>> when the second executeAsync() was called, it shows " *Job >>>>>> 00000000000000000000000000000000 was recovered successfully.*" >>>>>> Looks like the second executeAsync() recover the first job. Not start >>>>>> a second job. >>>>>> >>>>>> Thanks, >>>>>> Qihua >>>>>> >>>>>> >>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> env.execute("Job 1"); is a blocking call. You either have to use >>>>>>> executeAsync or use a separate thread to submit the second job. If Job 1 >>>>>>> finishes then this would also work by having sequential execution. >>>>>>> >>>>>>> However, I think what you actually want to do is to use the same env >>>>>>> with 2 topologies and 1 single execute like this. >>>>>>> >>>>>>> StreamExecutionEnvironment env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> DataStream<String> stream1 = env.addSource(new >>>>>>> SourceFunction<String>()); >>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>> DataStream<String> stream2 = env.addSource(new >>>>>>> SourceFunction<String>()); >>>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>>> env.execute("Job 1+2"); >>>>>>> >>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> Does anyone know how to run multiple jobs in same flink application? >>>>>>>> I did a simple test. First job was started. I did see the log >>>>>>>> message, but I didn't see the second job was started, even I saw the >>>>>>>> log >>>>>>>> message. >>>>>>>> >>>>>>>> public void testJobs() throws Exception { >>>>>>>> StreamExecutionEnvironment env = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> DataStream<String> stream1 = env.addSource(new >>>>>>>> SourceFunction<String>()); >>>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>>> printf("### first job"); >>>>>>>> env.execute("Job 1"); >>>>>>>> >>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> DataStream<String> stream2 = env.addSource(new >>>>>>>> SourceFunction<String>()); >>>>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>>>> printf("### second job"); >>>>>>>> env.execute("Job 2"); >>>>>>>> } >>>>>>>> >>>>>>>> Here is the log: >>>>>>>> ### first job >>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>>>>>> Job 00000000000000000000000000000000 is submitted. >>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>>>>>> Submitting Job with JobId=00000000000000000000000000000000. >>>>>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1). >>>>>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>>>> Submitting job 00000000000000000000000000000000 (job1). >>>>>>>> >>>>>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>>>>> execution of job job1 (00000000000000000000000000000000) under job >>>>>>>> master >>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07. >>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>>>>> scheduling with scheduling strategy >>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>>>>>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>>>>>> job1 (00000000000000000000000000000000) switched from state CREATED to >>>>>>>> RUNNING. >>>>>>>> >>>>>>>> ### second job >>>>>>>> WARN com.doordash.flink.common.utils.LoggerUtil - Class - >>>>>>>> IndexWriter : ### second job >>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >>>>>>>> ResourceManager address, beginning registration >>>>>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>>>>> Starting ZooKeeperLeaderRetrievalService >>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock. >>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>> - Registering job manager b03cde9dc2aebdb39c46cda4c2a94c07 >>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>>>>>> 00000000000000000000000000000000. >>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>>>>>> Job 00000000000000000000000000000000 was recovered successfully. >>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>> - Registered job manager b03cde9dc2aebdb39c46cda4c2a94c07 >>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>>>>>> 00000000000000000000000000000000. >>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >>>>>>>> successfully registered at ResourceManager, leader id: >>>>>>>> 956d4431ca90d45d92c027046cd0404e. >>>>>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] >>>>>>>> and >>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>> - Request slot with profile ResourceProfile{UNKNOWN} for job >>>>>>>> 00000000000000000000000000000000 with allocation id >>>>>>>> 21134414fc60d4ef3e940609cef960b6. >>>>>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] >>>>>>>> and >>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>> - Request slot with profile ResourceProfile{UNKNOWN} for job >>>>>>>> 00000000000000000000000000000000 with allocation id >>>>>>>> 650bd9100a35ef5086fd4614f5253b55. >>>>>>>> >>>>>>>