Hi Lee, A quick question: what version of flink are you using for testing execution.state-recovery.path? It looks like this config is only supported in flink 1.20 <https://issues.apache.org/jira/browse/FLINK-34454> which is not released yet.
Best, Biao Geng Lee, Keith <lee...@amazon.co.uk> 于2024年4月26日周五 04:51写道: > Apologies, I have included the jobmanager log for > 6969725a69ecc967aac2ce3eedcc274a instead of > 7881d53d28751f9bbbd3581976d9fe3d, however they looked exactly the same. > > Can include if necessary. > > Thanks > Keith > > > > *From: *"Lee, Keith" <lee...@amazon.co.uk> > *Date: *Thursday, 25 April 2024 at 21:41 > *To: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Flink SQL Client does not start job with savepoint > > > > Hi, > > Referring to > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint > > > I’ve followed the instruction however I do not see evidence of the job > being started with savepoint. See SQL statements excerpt below: > > > > Flink SQL> STOP JOB '14de8cc898d56653b96872fc0ba03c91' WITH SAVEPOINT; > > +----------------------------------------------------------+ > > | savepoint path | > > +----------------------------------------------------------+ > > | file:/tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc | > > +----------------------------------------------------------+ > > > … > > Flink SQL> CREATE TABLE Orders (order_number BIGINT,price > DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time > TIMESTAMP(3)) WITH ('connector' = 'datagen'); > > *[INFO] Execute statement succeed.* > > > > Flink SQL> CREATE TABLE OrdersBlackhole (order_number BIGINT,price > DECIMAL(32,2),buyer ROW<first_name STRING, last_name STRING>,order_time > TIMESTAMP(3)) WITH ('connector' = 'blackhole'); > > *[INFO] Execute statement succeed.* > > > > Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders; > > *[INFO] Submitting SQL update statement to the cluster...* > > *[INFO] SQL update statement has been successfully submitted to the > cluster:* > > Job ID: 6969725a69ecc967aac2ce3eedcc274a > > > > > > Flink SQL> STOP JOB '6969725a69ecc967aac2ce3eedcc274a'; > > *[INFO] Execute statement succeed.* > > > > Flink SQL> SET 'execution.state-recovery.path' = ' > file:///tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc'; > > *[INFO] Execute statement succeed.* > > > > Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders; > > *[INFO] Submitting SQL update statement to the cluster...* > > *[INFO] SQL update statement has been successfully submitted to the > cluster:* > > Job ID: 7881d53d28751f9bbbd3581976d9fe3d > > > > > I have attempted with and without the prefix file:// and file:/. > Additionally, I’ve also attempted the following in config.yml > > state.savepoints.dir: file:///tmp/flink-savepoints/ > > state.checkpoints.dir: file:///tmp/flink-checkpoints/ > > > Am I missing something? The jobmanager log did not indicate a start from > savepoint. > > > Received JobGraph submission > 'insert-into_default_catalog.default_database.OrdersBlackhole' > (6969725a69ecc967aac2ce3eedcc274a). > > Submitting job > 'insert-into_default_catalog.default_database.OrdersBlackhole' > (6969725a69ecc967aac2ce3eedcc274a). > > JobMasterServiceLeadershipRunner for job 6969725a69ecc967aac2ce3eedcc274a > was granted leadership with leader id 00000000-0000-0000-0000-000000000000. > Creating new JobMasterServiceProcess. > > Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at > pekko://flink/user/rpc/jobmanager_4 . > > Initializing job > 'insert-into_default_catalog.default_database.OrdersBlackhole' > (6969725a69ecc967aac2ce3eedcc274a). > > Using restart back off time strategy NoRestartBackoffTimeStrategy for > insert-into_default_catalog.default_database.OrdersBlackhole > (6969725a69ecc967aac2ce3eedcc274a). > > Created execution graph 9905f321e9958b6c36b71e0601a85a59 for job > 6969725a69ecc967aac2ce3eedcc274a. > > Running initialization on master for job > insert-into_default_catalog.default_database.OrdersBlackhole > (6969725a69ecc967aac2ce3eedcc274a). > > Successfully ran initialization on master in 0 ms. > > Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently. > > State backend is set to heap memory > org.apache.flink.runtime.state.hashmap.HashMapStateBackend@78e93599 > > State backend loader loads the state backend as HashMapStateBackend > > Using job/cluster config to configure application-defined checkpoint > storage: > org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@acb26a25 > > No checkpoint found during restore. > > Using failover strategy > org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionFailoverStrategy@7db68f8f > for insert-into_default_catalog.default_database.OrdersBlackhole > (6969725a69ecc967aac2ce3eedcc274a). > > Starting execution of job > 'insert-into_default_catalog.default_database.OrdersBlackhole' > (6969725a69ecc967aac2ce3eedcc274a) under job master id > 00000000000000000000000000000000. > > Starting scheduling with scheduling strategy > [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] > > Job insert-into_default_catalog.default_database.OrdersBlackhole > (6969725a69ecc967aac2ce3eedcc274a) switched from state CREATED to RUNNING. > > > Thanks in advance, > Keith > > >