Thanks a lot for checking again. I just started Flink in Application mode
with a jar that contains two "executeAsync" submissions, and indeed two
jobs are running.

I think the problem in your case is that you are using High Availability (I
guess, because there are log statements from the
ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:

The Application Mode allows for multi-execute() applications but
> High-Availability is not supported in these cases. High-Availability in
> Application Mode is only supported for single-execute() applications.

See also:

Sorry again that I gave you invalid information in my first answer.


On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang <> wrote:

> Hi Robert,
> But I saw Flink doc shows application mode can run multiple jobs? Or I
> misunderstand it?
> *Compared to the Per-Job mode, the Application Mode allows the submission of 
> applications consisting of multiple jobs. The order of job execution is not 
> affected by the deployment mode but by the call used to launch the job. Using 
> execute(), which is blocking, establishes an order and it will lead to the 
> execution of the "next" job being postponed until "this" job finishes. Using 
> executeAsync(), which is non-blocking, will lead to the "next" job starting 
> before "this" job finishes.*
> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <>
> wrote:
>> Hi Qihua,
>> Application Mode is meant for executing one job at a time, not multiple
>> jobs on the same JobManager.
>> If you want to do that, you need to use session mode, which allows
>> managing multiple jobs on the same JobManager.
>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <> wrote:
>>> Hi Arvid,
>>> Do you know if I can start multiple jobs for a single flink application?
>>> Thanks,
>>> Qihua
>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <> wrote:
>>>> Hi,
>>>> I am using application mode.
>>>> Thanks,
>>>> Qihua
>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <> wrote:
>>>>> Hi Qihua,
>>>>> Which execution mode are you using?
>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <> wrote:
>>>>>> Hi,
>>>>>> Thank you for your reply. What I want is flink app has multiple jobs,
>>>>>> each job manage a stream. Currently our flink app has only 1 job that
>>>>>> manage multiple streams.
>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>>>>>> when the second executeAsync() was called, it shows " *Job
>>>>>> 00000000000000000000000000000000 was recovered successfully.*"
>>>>>> Looks like the second executeAsync() recover the first job. Not start
>>>>>> a second job.
>>>>>> Thanks,
>>>>>> Qihua
>>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <> wrote:
>>>>>>> Hi,
>>>>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>>>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>>>>>> finishes then this would also work by having sequential execution.
>>>>>>> However, I think what you actually want to do is to use the same env
>>>>>>> with 2 topologies and 1 single execute like this.
>>>>>>> StreamExecutionEnvironment env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>> SourceFunction<String>());
>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>> SourceFunction<String>());
>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>> env.execute("Job 1+2");
>>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <>
>>>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>> Does anyone know how to run multiple jobs in same flink application?
>>>>>>>> I did a simple test.  First job was started. I did see the log
>>>>>>>> message, but I didn't see the second job was started, even I saw the 
>>>>>>>> log
>>>>>>>> message.
>>>>>>>> public void testJobs() throws Exception {
>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>>> SourceFunction<String>());
>>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>>> printf("### first job");
>>>>>>>> env.execute("Job 1");
>>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>>> SourceFunction<String>());
>>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>>> printf("### second job");
>>>>>>>>     env.execute("Job 2");
>>>>>>>> }
>>>>>>>> Here is the log:
>>>>>>>> ### first job
>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>> Job 00000000000000000000000000000000 is submitted.
>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>> Submitting Job with JobId=00000000000000000000000000000000.
>>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1).
>>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>>> Submitting job 00000000000000000000000000000000 (job1).
>>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>>> execution of job job1 (00000000000000000000000000000000) under job 
>>>>>>>> master
>>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>>> scheduling with scheduling strategy
>>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>>>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
>>>>>>>> job1 (00000000000000000000000000000000) switched from state CREATED to
>>>>>>>> RUNNING.
>>>>>>>> ### second job
>>>>>>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class -
>>>>>>>> IndexWriter : ### second job
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>>>>>>> ResourceManager address, beginning registration
>>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>>> Starting ZooKeeperLeaderRetrievalService
>>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock.
>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>  - Registering job manager b03cde9dc2aebdb39c46cda4c2a94c07
>>>>>>>> @akka.tcp://flink@ for job
>>>>>>>> 00000000000000000000000000000000.
>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>> Job 00000000000000000000000000000000 was recovered successfully.
>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>  - Registered job manager b03cde9dc2aebdb39c46cda4c2a94c07
>>>>>>>> @akka.tcp://flink@ for job
>>>>>>>> 00000000000000000000000000000000.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>>>>>>> successfully registered at ResourceManager, leader id:
>>>>>>>> 956d4431ca90d45d92c027046cd0404e.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] 
>>>>>>>> and
>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>  - Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>>> 21134414fc60d4ef3e940609cef960b6.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] 
>>>>>>>> and
>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>  - Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>>> 650bd9100a35ef5086fd4614f5253b55.

Reply via email to