The job construction itself is a bit complex, but it can either be a
StatementSet that's being filled, or there is some kind of conversion Table
-> DataStream and then we put the transformations on the DataStream itself.
Invocation looks like this:

      executionEffect =
        if (...)
          FlinkTask.lockedEffect(flink.execute(jobName))
        else FlinkTask.lockedEffect(statementSet.execute())

If I don't infinitely block on this, it terminates right after starting the
execution:

2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not
contain a setter for field partitionKey
2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ...
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not
contain a setter for field stage
2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ...
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation
on "Data Types & Serialization" for details of the effect on performance.
2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated
configuration key 'akka.client.timeout' instead of proper key
'client.timeout'
2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job
492c9f07d8b3458a52595ab49f636205 is submitted.
2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting Job
with JobId=492c9f07d8b3458a52595ab49f636205.
2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph
submission '....' (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job '....'
(492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC
endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_2 .
2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...'
(492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using
restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1,
backoffTimeMS=10000) for ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder Running
initialization on master for job ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
Successfully ran initialization on master in 0 ms.
2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1
pipelined regions in 0 ms
2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state backend
has been configured, using default (HashMap)
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e
2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend
loader loads the state backend as HashMapStateBackend
2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint
storage is set to 'jobmanager'
2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No checkpoint
found during restore.
2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job
492c9f07d8b3458a52595ab49f636205 from savepoint .... (allowing non restored
state)
2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the
checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400.
2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring job
492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for
492c9f07d8b3458a52595ab49f636205 located at file:..
2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to
restore
2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover
strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24
for ... (492c9f07d8b3458a52595ab49f636205).
2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of job
'...' (492c9f07d8b3458a52595ab49f636205) under job master id
00000000000000000000000000000000.
2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting scheduling
with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ...
(492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING.
2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1)
(3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
(Calc(select=[...]) (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from
CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124)
switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887)
switched from CREATED to SCHEDULED.
2021-12-22 09:25:28,790 INFO o.a.f.r.j.JobMaster Connecting to
ResourceManager akka.tcp://flink@localhost
:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
2021-12-22 09:25:28,794 INFO o.a.f.r.r.RetryingRegistration Resolved
ResourceManager address, beginning registration
2021-12-22 09:25:28,796 INFO o.a.f.r.r.ResourceManager Registering job
manager 
00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
for job 492c9f07d8b3458a52595ab49f636205.
2021-12-22 09:25:28,800 INFO o.a.f.r.r.ResourceManager Registered job
manager 
00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
for job 492c9f07d8b3458a52595ab49f636205.
2021-12-22 09:25:28,801 INFO o.a.f.r.j.JobMaster JobManager successfully
registered at ResourceManager, leader id: 00000000000000000000000000000000.
2021-12-22 09:25:28,803 INFO o.a.f.r.r.s.DeclarativeSlotManager Received
resource requirements from job 492c9f07d8b3458a52595ab49f636205:
[ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
numberOfRequiredSlots=3}]
2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Source: ... (1/1)
(3c3260f3f0c7d82452a46fc383ceb932) switched from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Deploying Source: ...
(1/1) (attempt #0) with attempt id 3c3260f3f0c7d82452a46fc383ceb932 to
...:64216-650fc2 @ ... (dataPort=64218) with allocation id
2e63675e30c595a8538f7a006fe0678d
2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution Deploying c... -> Map ->
Calc(select=[...]) -> Map (1/3) (attempt #0) with attempt id
3c05f0bd5ca1bd4903398bb39b5992fa to ... (dataPort=64218) with allocation id
2e63675e30c595a8538f7a006fe0678d
2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
Calc(select=[...]) -> Map (2/3) (attempt #0) with attempt id
644722a664ac6a9797b8638a225dbbf9 to ... (dataPort=64218) with allocation id
fe0a5941283557538901c8a9774a2584
2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution ... -> Map ->
Calc(select=[...]) -> Map (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched
from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
Calc(select=[...]) -> Map (3/3) (attempt #0) with attempt id
00cf0b3a6d...d8b393921 to ... (dataPort=64218) with allocation id
026dabf16a12ddf35399938466a27572
2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
switched from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution Deploying ...-batch ->
Sink: SnowflakeSinkProvider(...) (1/3) (attempt #0) with attempt id
bcaeb5103effbbddc2b4fc7ad801abbf to ... (dataPort=64218) with allocation id
2e63675e30c595a8538f7a006fe0678d
2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (2/3)  (8d046c60a84900cba31877ec28f81124)
switched from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
-> Sink: SnowflakeSinkProvider(...) (2/3)  (attempt #0) with attempt id
8d046c60a84900cba31877ec28f81124 to ... (dataPort=64218) with allocation id
fe0a5941283557538901c8a9774a2584
2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
SnowflakeSinkProvider(...) (3/3)  (39a1afd89f627816f018fa9652865887)
switched from SCHEDULED to DEPLOYING.
2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
-> Sink: SnowflakeSinkProvider(...) (3/3)  (attempt #0) with attempt id
39a1afd89f627816f018fa9652865887 to ... (dataPort=64218) with allocation id
026dabf16a12ddf35399938466a27572
2021-12-22 09:25:28,917 INFO Finished successfully with value: 0
2021-12-22 09:25:28,922 INFO o.a.f.r.e.ClusterEntrypoint Shutting
StandaloneSessionClusterEntrypoint down with application status UNKNOWN.
Diagnostics Cluster entrypoint has been closed externally..
2021-12-22 09:25:28,923 INFO o.a.f.r.r.RestServerEndpoint Shutting down
rest endpoint.
2021-12-22 09:25:28,943 INFO o.a.f.r.b.BlobServer Stopped BLOB server at
0.0.0.0:64213

Process finished with exit code 239

On Wed, Dec 22, 2021 at 8:47 AM Yuval Itzchakov <yuva...@gmail.com> wrote:

> I mean it finishes successful and exists with status code 0. Both when
> running locally and submitting to the cluster.
>
> On Wed, Dec 22, 2021, 08:36 Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> By "the streaming job stops" do you mean the job ends with CANCELED state
>> instead of FINISHED state? Which kind of job are you running? Is it a
>> select job or an insert job? Insert jobs should run continuously once
>> they're submitted. Could you share your user code if possible?
>>
>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 14:11写道:
>>
>>> Hi Caizhi,
>>>
>>> If I don't block on statementset.execute, the job finishes immediately
>>> with exit code 0 and the streaming job stops, and that's not what I want. I
>>> somehow need to block.
>>>
>>>
>>>
>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <tsreape...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> You can poll the status of that job with REST API [1]. You can tell
>>>> that the job successfully finishes by the FINISHED state and that the job
>>>> fails by the FAILED state.
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>>>
>>>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 02:36写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Flink 1.14.2
>>>>> Scala 2.12
>>>>>
>>>>> I have a streaming job that executes and I want to infinitely wait for
>>>>> it's completion, or if an exception is thrown during initialization. When
>>>>> using *statementSet.execute().await()*, I get an error:
>>>>>
>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
>>>>> Result cannot be fetched through the Job Client when in Web Submission.*
>>>>> at
>>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>>>> at
>>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>>>> ... 7 more
>>>>>
>>>>> This is because the Web Submission via the REST API is using
>>>>> the WebSubmissionJobClient.
>>>>>
>>>>> How can I wait on my Flink SQL streaming job when submitting through
>>>>> the REST API?
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov
>>>>>
>>>>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to