Hi, I have followed the steps below in restarting a Flink job with newly modified savepoints.
I can re start a job with new savepoints as long as the Flink states are expressed in Java primitives. When the flink states are expressed in a POJO, my job does not get restarted. I have the following exceptions. Any ideas? Do I need to redefine any serializers? Thank you very much for your help in advance. Regards, Min -----------------------------------------------------------------------------Flink exceptions------------------------------------------------------------------- 2021-07-07 19:02:58 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalStateException: Missing value for the key 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225) at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:194) at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189) at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164) at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:136) at org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(StateTable.java:315) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:54) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.createStateMap(CopyOnWriteStateTable.java:36) at org.apache.flink.runtime.state.heap.StateTable.<init>(StateTable.java:98) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.<init>(CopyOnWriteStateTable.java:49) at org.apache.flink.runtime.state.heap.AsyncSnapshotStrategySynchronicityBehavior.newStateTable(AsyncSnapshotStrategySynchronicityBehavior.java:41) at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.newStateTable(HeapSnapshotStrategy.java:243) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:152) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 more From: JING ZHANG <beyond1...@gmail.com> Sent: 29 June 2021 07:45 To: Marco Villalobos <mvillalo...@kineteque.com> Cc: user <user@flink.apache.org> Subject: [External] Re: State Processor API and existing state Hi Marco, > I assume that all the data within the checkpoint are stored within the given > Savepoint. Is that assumption correct? Yes > I have not figured out how to correct / augment / fix the state though. Can > somebody please explain? Please try this way. 1. Load old savepoint file, create Savepoint obj1 2. Read state of operator with UID Y in returned Savepoint obj1 by step1 3. Create `BootstrapTransformation` based on entry point class `OperatorTransformation`, bootstrap new operator state with dataset returned by step2, correct or fix old state of operator UID Y in a `StateBotstrapFunction` or `KeyedStateBootstrapFunction` 4. Load old savepoint file, create Savepoint obj2 5. Drop the old operator with UID Y by calling `removeOperator` in returned Savepoint obj2 by step4 6. Add a new Operator with UID Y by calling `withOperator` in returned Savepoint obj2 by step4 , the first parameter is uid (Y), the second parameter is returned `BootstrapTranformation` by step 3. 7. writes out returned Savepoint obj2 by step7 to a new path In this way, in new savepoint files, states of operator withUIDs: W,X, Z are intact, only the state of operator Y is updated. Detailed about read/write/modify savepoint could be found in document[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/ Best regards, JING ZHANG Marco Villalobos <mvillalo...@kineteque.com<mailto:mvillalo...@kineteque.com>> 于2021年6月29日周二 上午6:00写道: Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses RocksDB as a backend with checkpoint data URI s3://checkpoints" Then I stop the job with a savepoint at s3://savepoint-1. I assume that all the data within the checkpoint are stored within the given Savepoint. Is that assumption correct? Then, how can I fix the state in operator with UID Y, but keep all the data in the other operators intact? I know how to bootstrap state with the state-processor API. I have not figured out how to correct / augment / fix the state though. Can somebody please explain?
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential manipulation of contents and/or sender's address, incorrect recipient (misdirection), viruses etc. Based on previous e-mail correspondence with you and/or an agreement reached with you, UBS considers itself authorized to contact you via e-mail. UBS assumes no responsibility for any loss or damage resulting from the use of e-mails. The recipient is aware of and accepts the inherent risks of using e-mails, in particular the risk that the banking relationship and confidential information relating thereto are disclosed to third parties. UBS reserves the right to retain and monitor all messages. Messages are protected and accessed only in legally justified cases. For information on how UBS uses and discloses personal data, how long we retain it, how we keep it secure and your data protection rights, please see our Privacy Notice http://www.ubs.com/privacy-statement