Hi,

I am trying to explore using S3 for storing checkpoints and savepoints.
I can get Flink to store the checkpoints and savepoints in s3.

However, when I try to submit the same Job using the stored savepoint, it fails 
with below exception.
I am using Flink 1.2 and submitted the job from the UI dashboard.

Can anyone guide me through this issue?

Thanks,
Abhinav

Jobmanager logs with exception –

2017-03-18 00:10:09,193 INFO  org.apache.flink.runtime.blob.BlobClient          
             - Blob client connecting to akka://flink/user/jobmanager
2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.client.JobClient         
             - Checking and uploading JAR files
2017-03-18 00:10:09,348 INFO  org.apache.flink.runtime.blob.BlobClient          
             - Blob client connecting to akka://flink/user/jobmanager
2017-03-18 00:10:09,501 INFO  org.apache.flink.yarn.YarnJobManager              
             - Submitting job 4425245091bea9ad103dd3ff338244bb (Session Counter 
Example).
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager              
             - Using restart strategy NoRestartStrategy for 
4425245091bea9ad103dd3ff338244bb.
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager              
             - Running initialization on master for job Session Counter Example 
(4425245091bea9ad103dd3ff338244bb).
2017-03-18 00:10:09,502 INFO  org.apache.flink.yarn.YarnJobManager              
             - Successfully ran initialization on master in 0 ms.
2017-03-18 00:10:09,503 INFO  org.apache.flink.yarn.YarnJobManager              
             - Starting job from savepoint 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
2017-03-18 00:10:09,636 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Job Session 
Counter Example (4425245091bea9ad103dd3ff338244bb) switched from state CREATED 
to FAILING.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable 
failure. This suppresses job restarts. Please check the stack trace for the 
root cause.
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1369)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
                at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
                at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Invalid path 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createFsInputStream(SavepointStore.java:182)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepoint(SavepointStore.java:131)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1348)
                ... 10 more
2017-03-18 00:10:09,638 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Source: Custom 
Source -> Map (1/1) (f7e8f6c8d2030f5773f9d162d9ac2797) switched from CREATED to 
CANCELED.
2017-03-18 00:10:09,639 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - 
TriggerWindow(TumblingProcessingTimeWindows(15000), 
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f0aadd59},
 ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> 
Sink: Unnamed (1/1) (7d1917621cf923445ab904bb60c62bfd) switched from CREATED to 
CANCELED.
2017-03-18 00:10:09,639 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Try to restart 
or fail the job Session Counter Example (4425245091bea9ad103dd3ff338244bb) if 
no longer possible.
2017-03-18 00:10:09,639 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Job Session 
Counter Example (4425245091bea9ad103dd3ff338244bb) switched from state FAILING 
to FAILED.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable 
failure. This suppresses job restarts. Please check the stack trace for the 
root cause.
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1369)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
                at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
                at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Invalid path 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createFsInputStream(SavepointStore.java:182)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepoint(SavepointStore.java:131)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1348)
                ... 10 more
2017-03-18 00:10:09,640 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Could not 
restart the job Session Counter Example (4425245091bea9ad103dd3ff338244bb) 
because a type of SuppressRestartsException was thrown and the restart strategy 
prevented it.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable 
failure. This suppresses job restarts. Please check the stack trace for the 
root cause.
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1369)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
                at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
                at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Invalid path 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createFsInputStream(SavepointStore.java:182)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepoint(SavepointStore.java:131)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1348)
                ... 10 more
2017-03-18 00:10:09,640 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator      - Stopping 
checkpoint coordinator for job 4425245091bea9ad103dd3ff338244bb
2017-03-18 00:10:09,640 INFO  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore   - 
Shutting down
2017-03-18 00:18:15,290 INFO  org.apache.flink.runtime.blob.BlobClient          
             - Blob client connecting to akka://flink/user/jobmanager
2017-03-18 00:18:15,443 INFO  org.apache.flink.runtime.client.JobClient         
             - Checking and uploading JAR files
2017-03-18 00:18:15,443 INFO  org.apache.flink.runtime.blob.BlobClient          
             - Blob client connecting to akka://flink/user/jobmanager
2017-03-18 00:18:15,596 INFO  org.apache.flink.yarn.YarnJobManager              
             - Submitting job c965addb24f955a28400f89c2a41db57 (Session Counter 
Example).
2017-03-18 00:18:15,597 INFO  org.apache.flink.yarn.YarnJobManager              
             - Using restart strategy NoRestartStrategy for 
c965addb24f955a28400f89c2a41db57.
2017-03-18 00:18:15,597 INFO  org.apache.flink.yarn.YarnJobManager              
             - Running initialization on master for job Session Counter Example 
(c965addb24f955a28400f89c2a41db57).
2017-03-18 00:18:15,597 INFO  org.apache.flink.yarn.YarnJobManager              
             - Successfully ran initialization on master in 0 ms.
2017-03-18 00:18:15,598 INFO  org.apache.flink.yarn.YarnJobManager              
             - Starting job from savepoint 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
2017-03-18 00:18:15,728 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Job Session 
Counter Example (c965addb24f955a28400f89c2a41db57) switched from state CREATED 
to FAILING.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable 
failure. This suppresses job restarts. Please check the stack trace for the 
root cause.
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1369)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
                at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
                at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Invalid path 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createFsInputStream(SavepointStore.java:182)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepoint(SavepointStore.java:131)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1348)
                ... 10 more
2017-03-18 00:18:15,729 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Source: Custom 
Source -> Map (1/1) (7f4b79ade953f1e75158fc9ef7a197f4) switched from CREATED to 
CANCELED.
2017-03-18 00:18:15,729 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - 
TriggerWindow(TumblingProcessingTimeWindows(15000), 
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@f0aadd59},
 ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> 
Sink: Unnamed (1/1) (1f7b169898490ee055446ba42d92a0c2) switched from CREATED to 
CANCELED.
2017-03-18 00:18:15,729 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Try to restart 
or fail the job Session Counter Example (c965addb24f955a28400f89c2a41db57) if 
no longer possible.
2017-03-18 00:18:15,729 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Job Session 
Counter Example (c965addb24f955a28400f89c2a41db57) switched from state FAILING 
to FAILED.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable 
failure. This suppresses job restarts. Please check the stack trace for the 
root cause.
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1369)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
                at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
                at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Invalid path 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createFsInputStream(SavepointStore.java:182)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepoint(SavepointStore.java:131)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1348)
                ... 10 more
2017-03-18 00:18:15,730 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph         - Could not 
restart the job Session Counter Example (c965addb24f955a28400f89c2a41db57) 
because a type of SuppressRestartsException was thrown and the restart strategy 
prevented it.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable 
failure. This suppresses job restarts. Please check the stack trace for the 
root cause.
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1369)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1330)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
                at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
                at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Invalid path 
's3://flink-bucket/flink-savepoints/savepoint-0eba6ba712d2'.
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createFsInputStream(SavepointStore.java:182)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.loadSavepoint(SavepointStore.java:131)
                at 
org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:64)
                at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1348)
                ... 10 more
2017-03-18 00:18:15,730 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator      - Stopping 
checkpoint coordinator for job c965addb24f955a28400f89c2a41db57
2017-03-18 00:18:15,730 INFO  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore   - 
Shutting down

[cid:image001.png@01D29F44.93A85200]

Abhinav Bajaj
Lead Engineer
HERE Predictive Analytics
Office:  +12062092767
Mobile: +17083299516

HERE Seattle
701 Pike Street, #2000, Seattle, WA 98101, USA
47° 36' 41" N. 122° 19' 57" W
HERE Maps



Reply via email to