Hi,

We are using HA mode. Looks like multiple jobs is not an option for us....
That makes sense! Thanks for your guys' help!

Thanks,
Qihua


On Wed, Jun 23, 2021 at 7:28 PM Yang Wang <danrtsey...@gmail.com> wrote:

> Robert is right. We Could only support single job submission in
> application mode when the HA mode is enabled.
>
> This is a known limitation of current application mode implementation.
>
> Best,
> Yang
>
> Robert Metzger <rmetz...@apache.org> 于2021年6月24日周四 上午3:54写道:
>
>> Thanks a lot for checking again. I just started Flink in Application mode
>> with a jar that contains two "executeAsync" submissions, and indeed two
>> jobs are running.
>>
>> I think the problem in your case is that you are using High Availability
>> (I guess, because there are log statements from the
>> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:
>>
>> The Application Mode allows for multi-execute() applications but
>>> High-Availability is not supported in these cases. High-Availability in
>>> Application Mode is only supported for single-execute() applications.
>>
>>
>> See also: https://issues.apache.org/jira/browse/FLINK-19358
>>
>> Sorry again that I gave you invalid information in my first answer.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/
>>
>>
>>
>>
>> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang <yang...@gmail.com> wrote:
>>
>>> Hi Robert,
>>>
>>> But I saw Flink doc shows application mode can run multiple jobs? Or I
>>> misunderstand it?
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
>>>
>>>
>>>
>>> *Compared to the Per-Job mode, the Application Mode allows the submission 
>>> of applications consisting of multiple jobs. The order of job execution is 
>>> not affected by the deployment mode but by the call used to launch the job. 
>>> Using execute(), which is blocking, establishes an order and it will lead 
>>> to the execution of the "next" job being postponed until "this" job 
>>> finishes. Using executeAsync(), which is non-blocking, will lead to the 
>>> "next" job starting before "this" job finishes.*
>>>
>>>
>>> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>>
>>>> Hi Qihua,
>>>>
>>>> Application Mode is meant for executing one job at a time, not multiple
>>>> jobs on the same JobManager.
>>>> If you want to do that, you need to use session mode, which allows
>>>> managing multiple jobs on the same JobManager.
>>>>
>>>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>
>>>>> Hi Arvid,
>>>>>
>>>>> Do you know if I can start multiple jobs for a single flink
>>>>> application?
>>>>>
>>>>> Thanks,
>>>>> Qihua
>>>>>
>>>>>
>>>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am using application mode.
>>>>>>
>>>>>> Thanks,
>>>>>> Qihua
>>>>>>
>>>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Qihua,
>>>>>>>
>>>>>>> Which execution mode are you using?
>>>>>>>
>>>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Thank you for your reply. What I want is flink app has multiple
>>>>>>>> jobs, each job manage a stream. Currently our flink app has only 1 job 
>>>>>>>> that
>>>>>>>> manage multiple streams.
>>>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the
>>>>>>>> log, when the second executeAsync() was called, it shows " *Job
>>>>>>>> 00000000000000000000000000000000 was recovered successfully.*"
>>>>>>>> Looks like the second executeAsync() recover the first job. Not
>>>>>>>> start a second job.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Qihua
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>>>>>>> executeAsync or use a separate thread to submit the second job. If 
>>>>>>>>> Job 1
>>>>>>>>> finishes then this would also work by having sequential execution.
>>>>>>>>>
>>>>>>>>> However, I think what you actually want to do is to use the same
>>>>>>>>> env with 2 topologies and 1 single execute like this.
>>>>>>>>>
>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>>>> SourceFunction<String>());
>>>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>>>> SourceFunction<String>());
>>>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>>>> env.execute("Job 1+2");
>>>>>>>>>
>>>>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> Does anyone know how to run multiple jobs in same flink
>>>>>>>>>> application?
>>>>>>>>>> I did a simple test.  First job was started. I did see the log
>>>>>>>>>> message, but I didn't see the second job was started, even I saw the 
>>>>>>>>>> log
>>>>>>>>>> message.
>>>>>>>>>>
>>>>>>>>>> public void testJobs() throws Exception {
>>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>>>>> SourceFunction<String>());
>>>>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>>>>> printf("### first job");
>>>>>>>>>> env.execute("Job 1");
>>>>>>>>>>
>>>>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>>>>> SourceFunction<String>());
>>>>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>>>>> printf("### second job");
>>>>>>>>>>     env.execute("Job 2");
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Here is the log:
>>>>>>>>>> ### first job
>>>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor
>>>>>>>>>>  - Job 00000000000000000000000000000000 is submitted.
>>>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor
>>>>>>>>>>  - Submitting Job with JobId=00000000000000000000000000000000.
>>>>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1).
>>>>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>>>>> Submitting job 00000000000000000000000000000000 (job1).
>>>>>>>>>>
>>>>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>>>>> Starting ZooKeeperLeaderRetrievalService 
>>>>>>>>>> /leader/resource_manager_lock.
>>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>>>>> execution of job job1 (00000000000000000000000000000000) under job 
>>>>>>>>>> master
>>>>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>>>>> scheduling with scheduling strategy
>>>>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>>>>>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
>>>>>>>>>> Job job1 (00000000000000000000000000000000) switched from state 
>>>>>>>>>> CREATED to
>>>>>>>>>> RUNNING.
>>>>>>>>>>
>>>>>>>>>> ### second job
>>>>>>>>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class -
>>>>>>>>>> IndexWriter : ### second job
>>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>>>>>>>>> ResourceManager address, beginning registration
>>>>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>>>>> Starting ZooKeeperLeaderRetrievalService
>>>>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock.
>>>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>>>  - Registering job manager b03cde9dc2aebdb39c46cda4c2a94c07
>>>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for
>>>>>>>>>> job 00000000000000000000000000000000.
>>>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor
>>>>>>>>>>  - Job 00000000000000000000000000000000 was recovered successfully.
>>>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>>>  - Registered job manager b03cde9dc2aebdb39c46cda4c2a94c07
>>>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for
>>>>>>>>>> job 00000000000000000000000000000000.
>>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>>>>>>>>> successfully registered at ResourceManager, leader id:
>>>>>>>>>> 956d4431ca90d45d92c027046cd0404e.
>>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>>>>> Requesting new slot 
>>>>>>>>>> [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
>>>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>>>  - Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>>>>> 21134414fc60d4ef3e940609cef960b6.
>>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>>>>> Requesting new slot 
>>>>>>>>>> [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and
>>>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>>>  - Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>>>>> 650bd9100a35ef5086fd4614f5253b55.
>>>>>>>>>>
>>>>>>>>>

Reply via email to