Thanks a lot for checking again. I just started Flink in Application mode
with a jar that contains two "executeAsync" submissions, and indeed two
jobs are running.

I think the problem in your case is that you are using High Availability (I
guess, because there are log statements from the
ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:

The Application Mode allows for multi-execute() applications but
> High-Availability is not supported in these cases. High-Availability in
> Application Mode is only supported for single-execute() applications.


See also: https://issues.apache.org/jira/browse/FLINK-19358

Sorry again that I gave you invalid information in my first answer.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/




On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang <yang...@gmail.com> wrote:

> Hi Robert,
>
> But I saw Flink doc shows application mode can run multiple jobs? Or I
> misunderstand it?
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
>
>
>
> *Compared to the Per-Job mode, the Application Mode allows the submission of 
> applications consisting of multiple jobs. The order of job execution is not 
> affected by the deployment mode but by the call used to launch the job. Using 
> execute(), which is blocking, establishes an order and it will lead to the 
> execution of the "next" job being postponed until "this" job finishes. Using 
> executeAsync(), which is non-blocking, will lead to the "next" job starting 
> before "this" job finishes.*
>
>
> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Qihua,
>>
>> Application Mode is meant for executing one job at a time, not multiple
>> jobs on the same JobManager.
>> If you want to do that, you need to use session mode, which allows
>> managing multiple jobs on the same JobManager.
>>
>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <yang...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> Do you know if I can start multiple jobs for a single flink application?
>>>
>>> Thanks,
>>> Qihua
>>>
>>>
>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am using application mode.
>>>>
>>>> Thanks,
>>>> Qihua
>>>>
>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Qihua,
>>>>>
>>>>> Which execution mode are you using?
>>>>>
>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thank you for your reply. What I want is flink app has multiple jobs,
>>>>>> each job manage a stream. Currently our flink app has only 1 job that
>>>>>> manage multiple streams.
>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>>>>>> when the second executeAsync() was called, it shows " *Job
>>>>>> 00000000000000000000000000000000 was recovered successfully.*"
>>>>>> Looks like the second executeAsync() recover the first job. Not start
>>>>>> a second job.
>>>>>>
>>>>>> Thanks,
>>>>>> Qihua
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>>>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>>>>>> finishes then this would also work by having sequential execution.
>>>>>>>
>>>>>>> However, I think what you actually want to do is to use the same env
>>>>>>> with 2 topologies and 1 single execute like this.
>>>>>>>
>>>>>>> StreamExecutionEnvironment env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>> SourceFunction<String>());
>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>> SourceFunction<String>());
>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>> env.execute("Job 1+2");
>>>>>>>
>>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> Does anyone know how to run multiple jobs in same flink application?
>>>>>>>> I did a simple test.  First job was started. I did see the log
>>>>>>>> message, but I didn't see the second job was started, even I saw the 
>>>>>>>> log
>>>>>>>> message.
>>>>>>>>
>>>>>>>> public void testJobs() throws Exception {
>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>>> SourceFunction<String>());
>>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>>> printf("### first job");
>>>>>>>> env.execute("Job 1");
>>>>>>>>
>>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>>> SourceFunction<String>());
>>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>>> printf("### second job");
>>>>>>>>     env.execute("Job 2");
>>>>>>>> }
>>>>>>>>
>>>>>>>> Here is the log:
>>>>>>>> ### first job
>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>> Job 00000000000000000000000000000000 is submitted.
>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>> Submitting Job with JobId=00000000000000000000000000000000.
>>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1).
>>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>>> Submitting job 00000000000000000000000000000000 (job1).
>>>>>>>>
>>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>>> execution of job job1 (00000000000000000000000000000000) under job 
>>>>>>>> master
>>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>>> scheduling with scheduling strategy
>>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>>>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
>>>>>>>> job1 (00000000000000000000000000000000) switched from state CREATED to
>>>>>>>> RUNNING.
>>>>>>>>
>>>>>>>> ### second job
>>>>>>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class -
>>>>>>>> IndexWriter : ### second job
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>>>>>>> ResourceManager address, beginning registration
>>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>>> Starting ZooKeeperLeaderRetrievalService
>>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock.
>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>  - Registering job manager b03cde9dc2aebdb39c46cda4c2a94c07
>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>>>>>> 00000000000000000000000000000000.
>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>> Job 00000000000000000000000000000000 was recovered successfully.
>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>  - Registered job manager b03cde9dc2aebdb39c46cda4c2a94c07
>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>>>>>> 00000000000000000000000000000000.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>>>>>>> successfully registered at ResourceManager, leader id:
>>>>>>>> 956d4431ca90d45d92c027046cd0404e.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] 
>>>>>>>> and
>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>  - Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>>> 21134414fc60d4ef3e940609cef960b6.
>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] 
>>>>>>>> and
>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>  - Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>>> 650bd9100a35ef5086fd4614f5253b55.
>>>>>>>>
>>>>>>>

Reply via email to