Hi, env.execute("Job 1"); is a blocking call. You either have to use executeAsync or use a separate thread to submit the second job. If Job 1 finishes then this would also work by having sequential execution.
However, I think what you actually want to do is to use the same env with 2 topologies and 1 single execute like this. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> stream1 = env.addSource(new SourceFunction<String>()); stream1.addSink(new FlinkKafkaProducer()); DataStream<String> stream2 = env.addSource(new SourceFunction<String>()); stream2.addSink(new DiscardingSink<>()); env.execute("Job 1+2"); On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote: > Hi, > Does anyone know how to run multiple jobs in same flink application? > I did a simple test. First job was started. I did see the log message, > but I didn't see the second job was started, even I saw the log message. > > public void testJobs() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<String> stream1 = env.addSource(new SourceFunction<String>()); > stream1.addSink(new FlinkKafkaProducer()); > printf("### first job"); > env.execute("Job 1"); > > env = StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<String> stream2 = env.addSource(new SourceFunction<String>()); > stream2.addSink(new DiscardingSink<>()); > printf("### second job"); > env.execute("Job 2"); > } > > Here is the log: > ### first job > INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job > 00000000000000000000000000000000 is submitted. > INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - > Submitting Job with JobId=00000000000000000000000000000000. > INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received > JobGraph submission 00000000000000000000000000000000 (job1). > INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - > Submitting job 00000000000000000000000000000000 (job1). > > INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting > ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. > INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution > of job job1 (00000000000000000000000000000000) under job master id > b03cde9dc2aebdb39c46cda4c2a94c07. > INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting scheduling > with scheduling strategy > [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] > INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job job1 > (00000000000000000000000000000000) switched from state CREATED to RUNNING. > > ### second job > WARN com.doordash.flink.common.utils.LoggerUtil - Class - IndexWriter : > ### second job > INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved > ResourceManager address, beginning registration > INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting > ZooKeeperLeaderRetrievalService > /leader/00000000000000000000000000000000/job_manager_lock. > INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - > Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// > flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job > 00000000000000000000000000000000. > INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job > 00000000000000000000000000000000 was recovered successfully. > INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - > Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// > flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job > 00000000000000000000000000000000. > INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager > successfully registered at ResourceManager, leader id: > 956d4431ca90d45d92c027046cd0404e. > INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - > Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and > profile ResourceProfile{UNKNOWN} from resource manager. > INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - > Request slot with profile ResourceProfile{UNKNOWN} for job > 00000000000000000000000000000000 with allocation id > 21134414fc60d4ef3e940609cef960b6. > INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - > Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and > profile ResourceProfile{UNKNOWN} from resource manager. > INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - > Request slot with profile ResourceProfile{UNKNOWN} for job > 00000000000000000000000000000000 with allocation id > 650bd9100a35ef5086fd4614f5253b55. >