Hi, I think you might be running into a problem that is hard to solve with Flink 1.2 and Beam. As you noticed, it's a problem that Beam doesn't assign UIDs to operators, which is a problem. Flink 1.3 and even more Flink 1.4 are a bit more lenient in accepting changes to the graph, so you might have better luck when trying it with that. Did you try using a newer Beam/Flink version? Flink 1.4 should be out next week and shortly after that I'll also update the Beam dependency.
Best, Aljoscha > On 29. Nov 2017, at 23:52, Jins George <jins.geo...@aeris.net> wrote: > > Hi, > > I am running a Beam Pipeline on Flink 1.2 and facing an issue in restoring a > job from checkpoint. If I modify my beam pipeline to add a new operator and > try to restore from the externalized checkpoint, I get the error > > java.lang.IllegalStateException: Invalid Invalid number of operator states. > Found :56. Expected: 58 > at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkRestorePreconditions(StreamTask.java:680) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:650) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > > From the savepoint guide [1], new operator added should be initialized > without any state. Any idea why this error is reported. > > Also note, I am not setting a ID to my operator ( because Flink runner in > Beam does set the operator name user provided in pipeline creation) > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html> > > > Thanks, > Jins George