Hi, Thank you for your reply. What I want is flink app has multiple jobs, each job manage a stream. Currently our flink app has only 1 job that manage multiple streams. I did try env.executeAsyc(), but it still doesn't work. From the log, when the second executeAsync() was called, it shows " *Job 00000000000000000000000000000000 was recovered successfully.*" Looks like the second executeAsync() recover the first job. Not start a second job.
Thanks, Qihua On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote: > Hi, > > env.execute("Job 1"); is a blocking call. You either have to use > executeAsync or use a separate thread to submit the second job. If Job 1 > finishes then this would also work by having sequential execution. > > However, I think what you actually want to do is to use the same env with > 2 topologies and 1 single execute like this. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<String> stream1 = env.addSource(new SourceFunction<String>()); > stream1.addSink(new FlinkKafkaProducer()); > DataStream<String> stream2 = env.addSource(new SourceFunction<String>()); > stream2.addSink(new DiscardingSink<>()); > env.execute("Job 1+2"); > > On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote: > >> Hi, >> Does anyone know how to run multiple jobs in same flink application? >> I did a simple test. First job was started. I did see the log message, >> but I didn't see the second job was started, even I saw the log message. >> >> public void testJobs() throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream<String> stream1 = env.addSource(new SourceFunction<String>()); >> stream1.addSink(new FlinkKafkaProducer()); >> printf("### first job"); >> env.execute("Job 1"); >> >> env = StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream<String> stream2 = env.addSource(new SourceFunction<String>()); >> stream2.addSink(new DiscardingSink<>()); >> printf("### second job"); >> env.execute("Job 2"); >> } >> >> Here is the log: >> ### first job >> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >> 00000000000000000000000000000000 is submitted. >> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >> Submitting Job with JobId=00000000000000000000000000000000. >> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >> Received JobGraph submission 00000000000000000000000000000000 (job1). >> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >> Submitting job 00000000000000000000000000000000 (job1). >> >> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting >> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution >> of job job1 (00000000000000000000000000000000) under job master id >> b03cde9dc2aebdb39c46cda4c2a94c07. >> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting scheduling >> with scheduling strategy >> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job job1 >> (00000000000000000000000000000000) switched from state CREATED to RUNNING. >> >> ### second job >> WARN com.doordash.flink.common.utils.LoggerUtil - Class - IndexWriter : >> ### second job >> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >> ResourceManager address, beginning registration >> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting >> ZooKeeperLeaderRetrievalService >> /leader/00000000000000000000000000000000/job_manager_lock. >> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >> 00000000000000000000000000000000. >> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >> 00000000000000000000000000000000 was recovered successfully. >> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >> 00000000000000000000000000000000. >> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >> successfully registered at ResourceManager, leader id: >> 956d4431ca90d45d92c027046cd0404e. >> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and >> profile ResourceProfile{UNKNOWN} from resource manager. >> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >> Request slot with profile ResourceProfile{UNKNOWN} for job >> 00000000000000000000000000000000 with allocation id >> 21134414fc60d4ef3e940609cef960b6. >> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and >> profile ResourceProfile{UNKNOWN} from resource manager. >> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >> Request slot with profile ResourceProfile{UNKNOWN} for job >> 00000000000000000000000000000000 with allocation id >> 650bd9100a35ef5086fd4614f5253b55. >> >