Hi Arvid, Do you know if I can start multiple jobs for a single flink application?
Thanks, Qihua On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote: > Hi, > > I am using application mode. > > Thanks, > Qihua > > On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi Qihua, >> >> Which execution mode are you using? >> >> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote: >> >>> Hi, >>> >>> Thank you for your reply. What I want is flink app has multiple jobs, >>> each job manage a stream. Currently our flink app has only 1 job that >>> manage multiple streams. >>> I did try env.executeAsyc(), but it still doesn't work. From the log, >>> when the second executeAsync() was called, it shows " *Job >>> 00000000000000000000000000000000 was recovered successfully.*" >>> Looks like the second executeAsync() recover the first job. Not start a >>> second job. >>> >>> Thanks, >>> Qihua >>> >>> >>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi, >>>> >>>> env.execute("Job 1"); is a blocking call. You either have to use >>>> executeAsync or use a separate thread to submit the second job. If Job 1 >>>> finishes then this would also work by having sequential execution. >>>> >>>> However, I think what you actually want to do is to use the same env >>>> with 2 topologies and 1 single execute like this. >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> DataStream<String> stream1 = env.addSource(new >>>> SourceFunction<String>()); >>>> stream1.addSink(new FlinkKafkaProducer()); >>>> DataStream<String> stream2 = env.addSource(new >>>> SourceFunction<String>()); >>>> stream2.addSink(new DiscardingSink<>()); >>>> env.execute("Job 1+2"); >>>> >>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> Does anyone know how to run multiple jobs in same flink application? >>>>> I did a simple test. First job was started. I did see the log >>>>> message, but I didn't see the second job was started, even I saw the log >>>>> message. >>>>> >>>>> public void testJobs() throws Exception { >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> DataStream<String> stream1 = env.addSource(new >>>>> SourceFunction<String>()); >>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>> printf("### first job"); >>>>> env.execute("Job 1"); >>>>> >>>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> DataStream<String> stream2 = env.addSource(new >>>>> SourceFunction<String>()); >>>>> stream2.addSink(new DiscardingSink<>()); >>>>> printf("### second job"); >>>>> env.execute("Job 2"); >>>>> } >>>>> >>>>> Here is the log: >>>>> ### first job >>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >>>>> 00000000000000000000000000000000 is submitted. >>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>>> Submitting Job with JobId=00000000000000000000000000000000. >>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>> Received JobGraph submission 00000000000000000000000000000000 (job1). >>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>> Submitting job 00000000000000000000000000000000 (job1). >>>>> >>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>> execution of job job1 (00000000000000000000000000000000) under job master >>>>> id b03cde9dc2aebdb39c46cda4c2a94c07. >>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>> scheduling with scheduling strategy >>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>>> job1 (00000000000000000000000000000000) switched from state CREATED to >>>>> RUNNING. >>>>> >>>>> ### second job >>>>> WARN com.doordash.flink.common.utils.LoggerUtil - Class - >>>>> IndexWriter : ### second job >>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >>>>> ResourceManager address, beginning registration >>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>> Starting ZooKeeperLeaderRetrievalService >>>>> /leader/00000000000000000000000000000000/job_manager_lock. >>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>>> 00000000000000000000000000000000. >>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >>>>> 00000000000000000000000000000000 was recovered successfully. >>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>>> 00000000000000000000000000000000. >>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >>>>> successfully registered at ResourceManager, leader id: >>>>> 956d4431ca90d45d92c027046cd0404e. >>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and >>>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Request slot with profile ResourceProfile{UNKNOWN} for job >>>>> 00000000000000000000000000000000 with allocation id >>>>> 21134414fc60d4ef3e940609cef960b6. >>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and >>>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>> Request slot with profile ResourceProfile{UNKNOWN} for job >>>>> 00000000000000000000000000000000 with allocation id >>>>> 650bd9100a35ef5086fd4614f5253b55. >>>>> >>>>