Hi Lei, setting explicit operator ID should solve this issue. As far as I know, the auto-generated operator id also depended on the operator parallelism in previous versions of Flink (not sure until which point).
Which version are you running? Best, Fabian 2017-10-17 3:15 GMT+02:00 Lei Chen <leyn...@gmail.com>: > Hi, > > We're trying to implement some module to help autoscale our pipeline which > is built with Flink on YARN. According to the document, the suggested > procedure seems to be: > > 1. cancel job with savepoint > 2. start new job with increased YARN TM number and parallelism. > > However, step 2 always gave error > > Caused by: java.lang.IllegalStateException: Failed to rollback to > savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot > map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the > new program, because the operator is not available in the new program. If > you want to allow to skip this, you can set the --allowNonRestoredState > option on the CLI. > at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoade > r.loadAndValidateSavepoint(SavepointLoader.java:130) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.re > storeSavepoint(CheckpointCoordinator.java:1140) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1. > apply$mcV$sp(JobManager.scala:1386) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1. > apply(JobManager.scala:1372) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$ > apache$flink$runtime$jobmanager$JobManager$$submitJob$1. > apply(JobManager.scala:1372) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte > dTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F > uture.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask. > exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For > kJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo > l.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW > orkerThread.java:107) > > The procedure worked fine if parallelism was not changed. > > Also want to mention that I didn't manually specify OperatorID in my job. The > document does mentioned manually OperatorID assignment is suggested, just > curious is that mandatory in my case to fix the problem I'm seeing, given > that my program doesn't change at all so the autogenerated operatorID > should be unchanged after parallelism increase? > > thanks, > Lei >