Hi Lee,

A quick question: what version of flink are you using for
testing execution.state-recovery.path?
It looks like this config is only supported in flink 1.20
<https://issues.apache.org/jira/browse/FLINK-34454> which is not released
yet.


Best,
Biao Geng


Lee, Keith <lee...@amazon.co.uk> 于2024年4月26日周五 04:51写道:

> Apologies, I have included the jobmanager log for
> 6969725a69ecc967aac2ce3eedcc274a  instead of
> 7881d53d28751f9bbbd3581976d9fe3d, however they looked exactly the same.
>
> Can include if necessary.
>
> Thanks
> Keith
>
>
>
> *From: *"Lee, Keith" <lee...@amazon.co.uk>
> *Date: *Thursday, 25 April 2024 at 21:41
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Flink SQL Client does not start job with savepoint
>
>
>
> Hi,
>
> Referring to
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
>
>
> I’ve followed the instruction however I do not see evidence of the job
> being started with savepoint. See SQL statements excerpt below:
>
>
>
> Flink SQL> STOP JOB '14de8cc898d56653b96872fc0ba03c91' WITH SAVEPOINT;
>
> +----------------------------------------------------------+
>
> |                                           savepoint path |
>
> +----------------------------------------------------------+
>
> | file:/tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc |
>
> +----------------------------------------------------------+
>
>
> …
>
> Flink SQL> CREATE TABLE Orders (order_number BIGINT,price
> DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time
> TIMESTAMP(3)) WITH ('connector' = 'datagen');
>
> *[INFO] Execute statement succeed.*
>
>
>
> Flink SQL> CREATE TABLE OrdersBlackhole (order_number BIGINT,price
> DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time
> TIMESTAMP(3)) WITH ('connector' = 'blackhole');
>
> *[INFO] Execute statement succeed.*
>
>
>
> Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] SQL update statement has been successfully submitted to the
> cluster:*
>
> Job ID: 6969725a69ecc967aac2ce3eedcc274a
>
>
>
>
>
> Flink SQL> STOP JOB '6969725a69ecc967aac2ce3eedcc274a';
>
> *[INFO] Execute statement succeed.*
>
>
>
> Flink SQL> SET 'execution.state-recovery.path' = '
> file:///tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc';
>
> *[INFO] Execute statement succeed.*
>
>
>
> Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] SQL update statement has been successfully submitted to the
> cluster:*
>
> Job ID: 7881d53d28751f9bbbd3581976d9fe3d
>
>
>
>
> I have attempted with and without the prefix file:// and file:/.
> Additionally, I’ve also attempted the following in config.yml
>
> state.savepoints.dir: file:///tmp/flink-savepoints/
>
> state.checkpoints.dir: file:///tmp/flink-checkpoints/
>
>
> Am I missing something? The jobmanager log did not indicate a start from
> savepoint.
>
>
> Received JobGraph submission
> 'insert-into_default_catalog.default_database.OrdersBlackhole'
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Submitting job
> 'insert-into_default_catalog.default_database.OrdersBlackhole'
> (6969725a69ecc967aac2ce3eedcc274a).
>
> JobMasterServiceLeadershipRunner for job 6969725a69ecc967aac2ce3eedcc274a
> was granted leadership with leader id 00000000-0000-0000-0000-000000000000.
> Creating new JobMasterServiceProcess.
>
> Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> pekko://flink/user/rpc/jobmanager_4 .
>
> Initializing job
> 'insert-into_default_catalog.default_database.OrdersBlackhole'
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Using restart back off time strategy NoRestartBackoffTimeStrategy for
> insert-into_default_catalog.default_database.OrdersBlackhole
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Created execution graph 9905f321e9958b6c36b71e0601a85a59 for job
> 6969725a69ecc967aac2ce3eedcc274a.
>
> Running initialization on master for job
> insert-into_default_catalog.default_database.OrdersBlackhole
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Successfully ran initialization on master in 0 ms.
>
> Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
>
> State backend is set to heap memory
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@78e93599
>
> State backend loader loads the state backend as HashMapStateBackend
>
> Using job/cluster config to configure application-defined checkpoint
> storage:
> org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@acb26a25
>
> No checkpoint found during restore.
>
> Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionFailoverStrategy@7db68f8f
> for insert-into_default_catalog.default_database.OrdersBlackhole
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Starting execution of job
> 'insert-into_default_catalog.default_database.OrdersBlackhole'
> (6969725a69ecc967aac2ce3eedcc274a) under job master id
> 00000000000000000000000000000000.
>
> Starting scheduling with scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
>
> Job insert-into_default_catalog.default_database.OrdersBlackhole
> (6969725a69ecc967aac2ce3eedcc274a) switched from state CREATED to RUNNING.
>
>
> Thanks in advance,
> Keith
>
>
>

Reply via email to