The job construction itself is a bit complex, but it can either be a StatementSet that's being filled, or there is some kind of conversion Table -> DataStream and then we put the transformations on the DataStream itself. Invocation looks like this:
executionEffect = if (...) FlinkTask.lockedEffect(flink.execute(jobName)) else FlinkTask.lockedEffect(statementSet.execute()) If I don't infinitely block on this, it terminates right after starting the execution: 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not contain a setter for field partitionKey 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ... cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not contain a setter for field stage 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ... cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated configuration key 'akka.client.timeout' instead of proper key 'client.timeout' 2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job 492c9f07d8b3458a52595ab49f636205 is submitted. 2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting Job with JobId=492c9f07d8b3458a52595ab49f636205. 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph submission '....' (492c9f07d8b3458a52595ab49f636205). 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job '....' (492c9f07d8b3458a52595ab49f636205). 2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 . 2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...' (492c9f07d8b3458a52595ab49f636205). 2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, backoffTimeMS=10000) for ... (492c9f07d8b3458a52595ab49f636205). 2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder Running initialization on master for job ... (492c9f07d8b3458a52595ab49f636205). 2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder Successfully ran initialization on master in 0 ms. 2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1 pipelined regions in 0 ms 2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e 2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend loader loads the state backend as HashMapStateBackend 2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint storage is set to 'jobmanager' 2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No checkpoint found during restore. 2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job 492c9f07d8b3458a52595ab49f636205 from savepoint .... (allowing non restored state) 2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400. 2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring job 492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for 492c9f07d8b3458a52595ab49f636205 located at file:.. 2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to restore 2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24 for ... (492c9f07d8b3458a52595ab49f636205). 2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of job '...' (492c9f07d8b3458a52595ab49f636205) under job master id 00000000000000000000000000000000. 2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] 2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ... (492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING. 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1) (3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED. 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map -> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched from CREATED to SCHEDULED. 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map -> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched from CREATED to SCHEDULED. 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map -> (Calc(select=[...]) (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from CREATED to SCHEDULED. 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf) switched from CREATED to SCHEDULED. 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124) switched from CREATED to SCHEDULED. 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887) switched from CREATED to SCHEDULED. 2021-12-22 09:25:28,790 INFO o.a.f.r.j.JobMaster Connecting to ResourceManager akka.tcp://flink@localhost :6123/user/rpc/resourcemanager_*(00000000000000000000000000000000) 2021-12-22 09:25:28,794 INFO o.a.f.r.r.RetryingRegistration Resolved ResourceManager address, beginning registration 2021-12-22 09:25:28,796 INFO o.a.f.r.r.ResourceManager Registering job manager 00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 for job 492c9f07d8b3458a52595ab49f636205. 2021-12-22 09:25:28,800 INFO o.a.f.r.r.ResourceManager Registered job manager 00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 for job 492c9f07d8b3458a52595ab49f636205. 2021-12-22 09:25:28,801 INFO o.a.f.r.j.JobMaster JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000. 2021-12-22 09:25:28,803 INFO o.a.f.r.r.s.DeclarativeSlotManager Received resource requirements from job 492c9f07d8b3458a52595ab49f636205: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=3}] 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Source: ... (1/1) (3c3260f3f0c7d82452a46fc383ceb932) switched from SCHEDULED to DEPLOYING. 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Deploying Source: ... (1/1) (attempt #0) with attempt id 3c3260f3f0c7d82452a46fc383ceb932 to ...:64216-650fc2 @ ... (dataPort=64218) with allocation id 2e63675e30c595a8538f7a006fe0678d 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution ... -> Map -> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched from SCHEDULED to DEPLOYING. 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution Deploying c... -> Map -> Calc(select=[...]) -> Map (1/3) (attempt #0) with attempt id 3c05f0bd5ca1bd4903398bb39b5992fa to ... (dataPort=64218) with allocation id 2e63675e30c595a8538f7a006fe0678d 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution ... -> Map -> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched from SCHEDULED to DEPLOYING. 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution Deploying ... -> Map -> Calc(select=[...]) -> Map (2/3) (attempt #0) with attempt id 644722a664ac6a9797b8638a225dbbf9 to ... (dataPort=64218) with allocation id fe0a5941283557538901c8a9774a2584 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution ... -> Map -> Calc(select=[...]) -> Map (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from SCHEDULED to DEPLOYING. 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution Deploying ... -> Map -> Calc(select=[...]) -> Map (3/3) (attempt #0) with attempt id 00cf0b3a6d...d8b393921 to ... (dataPort=64218) with allocation id 026dabf16a12ddf35399938466a27572 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf) switched from SCHEDULED to DEPLOYING. 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution Deploying ...-batch -> Sink: SnowflakeSinkProvider(...) (1/3) (attempt #0) with attempt id bcaeb5103effbbddc2b4fc7ad801abbf to ... (dataPort=64218) with allocation id 2e63675e30c595a8538f7a006fe0678d 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124) switched from SCHEDULED to DEPLOYING. 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution Deploying ...-sink-batch -> Sink: SnowflakeSinkProvider(...) (2/3) (attempt #0) with attempt id 8d046c60a84900cba31877ec28f81124 to ... (dataPort=64218) with allocation id fe0a5941283557538901c8a9774a2584 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887) switched from SCHEDULED to DEPLOYING. 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution Deploying ...-sink-batch -> Sink: SnowflakeSinkProvider(...) (3/3) (attempt #0) with attempt id 39a1afd89f627816f018fa9652865887 to ... (dataPort=64218) with allocation id 026dabf16a12ddf35399938466a27572 2021-12-22 09:25:28,917 INFO Finished successfully with value: 0 2021-12-22 09:25:28,922 INFO o.a.f.r.e.ClusterEntrypoint Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2021-12-22 09:25:28,923 INFO o.a.f.r.r.RestServerEndpoint Shutting down rest endpoint. 2021-12-22 09:25:28,943 INFO o.a.f.r.b.BlobServer Stopped BLOB server at 0.0.0.0:64213 Process finished with exit code 239 On Wed, Dec 22, 2021 at 8:47 AM Yuval Itzchakov <yuva...@gmail.com> wrote: > I mean it finishes successful and exists with status code 0. Both when > running locally and submitting to the cluster. > > On Wed, Dec 22, 2021, 08:36 Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> By "the streaming job stops" do you mean the job ends with CANCELED state >> instead of FINISHED state? Which kind of job are you running? Is it a >> select job or an insert job? Insert jobs should run continuously once >> they're submitted. Could you share your user code if possible? >> >> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 14:11写道: >> >>> Hi Caizhi, >>> >>> If I don't block on statementset.execute, the job finishes immediately >>> with exit code 0 and the streaming job stops, and that's not what I want. I >>> somehow need to block. >>> >>> >>> >>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <tsreape...@gmail.com> wrote: >>> >>>> Hi! >>>> >>>> You can poll the status of that job with REST API [1]. You can tell >>>> that the job successfully finishes by the FINISHED state and that the job >>>> fails by the FAILED state. >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid >>>> >>>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 02:36写道: >>>> >>>>> Hi, >>>>> >>>>> Flink 1.14.2 >>>>> Scala 2.12 >>>>> >>>>> I have a streaming job that executes and I want to infinitely wait for >>>>> it's completion, or if an exception is thrown during initialization. When >>>>> using *statementSet.execute().await()*, I get an error: >>>>> >>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job >>>>> Result cannot be fetched through the Job Client when in Web Submission.* >>>>> at >>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88) >>>>> at >>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54) >>>>> ... 7 more >>>>> >>>>> This is because the Web Submission via the REST API is using >>>>> the WebSubmissionJobClient. >>>>> >>>>> How can I wait on my Flink SQL streaming job when submitting through >>>>> the REST API? >>>>> -- >>>>> Best Regards, >>>>> Yuval Itzchakov >>>>> >>>> -- Best Regards, Yuval Itzchakov.