Hi Qihua, Which execution mode are you using?
On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote: > Hi, > > Thank you for your reply. What I want is flink app has multiple jobs, each > job manage a stream. Currently our flink app has only 1 job that manage > multiple streams. > I did try env.executeAsyc(), but it still doesn't work. From the log, when > the second executeAsync() was called, it shows " *Job > 00000000000000000000000000000000 was recovered successfully.*" > Looks like the second executeAsync() recover the first job. Not start a > second job. > > Thanks, > Qihua > > > On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi, >> >> env.execute("Job 1"); is a blocking call. You either have to use >> executeAsync or use a separate thread to submit the second job. If Job 1 >> finishes then this would also work by having sequential execution. >> >> However, I think what you actually want to do is to use the same env with >> 2 topologies and 1 single execute like this. >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream<String> stream1 = env.addSource(new SourceFunction<String>()); >> stream1.addSink(new FlinkKafkaProducer()); >> DataStream<String> stream2 = env.addSource(new SourceFunction<String>()); >> stream2.addSink(new DiscardingSink<>()); >> env.execute("Job 1+2"); >> >> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote: >> >>> Hi, >>> Does anyone know how to run multiple jobs in same flink application? >>> I did a simple test. First job was started. I did see the log message, >>> but I didn't see the second job was started, even I saw the log message. >>> >>> public void testJobs() throws Exception { >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStream<String> stream1 = env.addSource(new SourceFunction<String>()); >>> stream1.addSink(new FlinkKafkaProducer()); >>> printf("### first job"); >>> env.execute("Job 1"); >>> >>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStream<String> stream2 = env.addSource(new SourceFunction<String>()); >>> stream2.addSink(new DiscardingSink<>()); >>> printf("### second job"); >>> env.execute("Job 2"); >>> } >>> >>> Here is the log: >>> ### first job >>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >>> 00000000000000000000000000000000 is submitted. >>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>> Submitting Job with JobId=00000000000000000000000000000000. >>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>> Received JobGraph submission 00000000000000000000000000000000 (job1). >>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>> Submitting job 00000000000000000000000000000000 (job1). >>> >>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution >>> of job job1 (00000000000000000000000000000000) under job master id >>> b03cde9dc2aebdb39c46cda4c2a94c07. >>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>> scheduling with scheduling strategy >>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job job1 >>> (00000000000000000000000000000000) switched from state CREATED to RUNNING. >>> >>> ### second job >>> WARN com.doordash.flink.common.utils.LoggerUtil - Class - IndexWriter >>> : ### second job >>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >>> ResourceManager address, beginning registration >>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>> Starting ZooKeeperLeaderRetrievalService >>> /leader/00000000000000000000000000000000/job_manager_lock. >>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>> 00000000000000000000000000000000. >>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >>> 00000000000000000000000000000000 was recovered successfully. >>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>> 00000000000000000000000000000000. >>> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >>> successfully registered at ResourceManager, leader id: >>> 956d4431ca90d45d92c027046cd0404e. >>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and >>> profile ResourceProfile{UNKNOWN} from resource manager. >>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Request slot with profile ResourceProfile{UNKNOWN} for job >>> 00000000000000000000000000000000 with allocation id >>> 21134414fc60d4ef3e940609cef960b6. >>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and >>> profile ResourceProfile{UNKNOWN} from resource manager. >>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>> Request slot with profile ResourceProfile{UNKNOWN} for job >>> 00000000000000000000000000000000 with allocation id >>> 650bd9100a35ef5086fd4614f5253b55. >>> >>