Hi min, Is the POJO state in an existed operator or a new added operator? BTW, that would be great if you would like to give the code to reproduce the exception. I need more debug to find out the reason based on the code.
Tan, Min <min....@ubs.com> 于2021年7月8日周四 上午2:56写道: > Hi, > > > > I have followed the steps below in restarting a Flink job with newly > modified savepoints. > > > > I can re start a job with new savepoints as long as the Flink states are > expressed in Java primitives. > > When the flink states are expressed in a POJO, my job does not get > restarted. I have the following exceptions. > > > > Any ideas? Do I need to redefine any serializers? > > > > Thank you very much for your help in advance. > > Regards, > > > > Min > > > > > > -----------------------------------------------------------------------------Flink > exceptions------------------------------------------------------------------- > > 2021-07-07 19:02:58 > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:191) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .initializeState(AbstractStreamOperator.java:255) > > at org.apache.flink.streaming.runtime.tasks.StreamTask > .initializeStateAndOpen(StreamTask.java:989) > > at org.apache.flink.streaming.runtime.tasks.StreamTask > .lambda$beforeInvoke$0(StreamTask.java:453) > > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:94) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( > StreamTask.java:448) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:460) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for > LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) from > any of the 1 provided restore options. > > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:135) > > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.keyedStatedBackend( > StreamTaskStateInitializerImpl.java:304) > > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.streamOperatorStateContext( > StreamTaskStateInitializerImpl.java:131) > > ... 9 more > > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:116) > > at org.apache.flink.runtime.state.filesystem.FsStateBackend > .createKeyedStateBackend(FsStateBackend.java:529) > > at org.apache.flink.streaming.api.operators. > StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1( > StreamTaskStateInitializerImpl.java:288) > > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .attemptCreateAndRestore(BackendRestorerProcedure.java:142) > > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:121) > > ... 11 more > > Caused by: java.lang.IllegalStateException: Missing value for the key > 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState' > > at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals( > LinkedOptionalMap.java:190) > > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86) > > at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline > .java:193) > > at java.util.Spliterators$ArraySpliterator.forEachRemaining( > Spliterators.java:948) > > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java: > 482) > > at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline > .java:472) > > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java: > 546) > > at java.util.stream.AbstractPipeline.evaluateToArrayNode( > AbstractPipeline.java:260) > > at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java: > 505) > > at org.apache.flink.api.common.typeutils. > NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers( > NestedSerializersSnapshotDelegate.java:225) > > at org.apache.flink.api.common.typeutils. > NestedSerializersSnapshotDelegate.getRestoredNestedSerializers( > NestedSerializersSnapshotDelegate.java:83) > > at org.apache.flink.api.common.typeutils. > CompositeTypeSerializerSnapshot.restoreSerializer( > CompositeTypeSerializerSnapshot.java:194) > > at org.apache.flink.runtime.state.StateSerializerProvider > .previousSchemaSerializer(StateSerializerProvider.java:189) > > at org.apache.flink.runtime.state.StateSerializerProvider > .currentSchemaSerializer(StateSerializerProvider.java:164) > > at org.apache.flink.runtime.state. > RegisteredKeyValueStateBackendMetaInfo.getStateSerializer( > RegisteredKeyValueStateBackendMetaInfo.java:136) > > at org.apache.flink.runtime.state.heap.StateTable.getStateSerializer( > StateTable.java:315) > > at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable > .createStateMap(CopyOnWriteStateTable.java:54) > > at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable > .createStateMap(CopyOnWriteStateTable.java:36) > > at org.apache.flink.runtime.state.heap.StateTable.<init>(StateTable > .java:98) > > at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.<init>( > CopyOnWriteStateTable.java:49) > > at org.apache.flink.runtime.state.heap. > AsyncSnapshotStrategySynchronicityBehavior.newStateTable( > AsyncSnapshotStrategySynchronicityBehavior.java:41) > > at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy > .newStateTable(HeapSnapshotStrategy.java:243) > > at org.apache.flink.runtime.state.heap.HeapRestoreOperation > .createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185) > > at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore( > HeapRestoreOperation.java:152) > > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder > .build(HeapKeyedStateBackendBuilder.java:114) > > ... 15 more > > > > > > > > *From:* JING ZHANG <beyond1...@gmail.com> > *Sent:* 29 June 2021 07:45 > *To:* Marco Villalobos <mvillalo...@kineteque.com> > *Cc:* user <user@flink.apache.org> > *Subject:* [External] Re: State Processor API and existing state > > > > Hi Marco, > > > I assume that all the data within the checkpoint are stored within the > given Savepoint. Is that assumption correct? > > Yes > > > I have not figured out how to correct / augment / fix the state though. > Can somebody please explain? > > Please try this way. > > 1. Load old savepoint file, create Savepoint obj1 > > 2. Read state of operator with UID Y in returned Savepoint obj1 by step1 > > 3. Create `BootstrapTransformation` based on entry point class > `OperatorTransformation`, bootstrap new operator state with dataset > returned by step2, correct or fix old state of operator UID Y in a > `StateBotstrapFunction` or `KeyedStateBootstrapFunction` > > 4. Load old savepoint file, create Savepoint obj2 > > 5. Drop the old operator with UID Y by calling `removeOperator` in > returned Savepoint obj2 by step4 > > 6. Add a new Operator with UID Y by calling `withOperator` in returned > Savepoint obj2 by step4 , the first parameter is uid (Y), the second > parameter is returned `BootstrapTranformation` by step 3. > > 7. writes out returned Savepoint obj2 by step7 to a new path > > > > In this way, in new savepoint files, states of operator withUIDs: W,X, Z > are intact, only the state of operator Y is updated. > > Detailed about read/write/modify savepoint could be found in document[1] > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/ > > > > Best regards, > > JING ZHANG > > > > Marco Villalobos <mvillalo...@kineteque.com> 于2021年6月29日周二 上午6:00写道: > > Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses > RocksDB as a backend with checkpoint data URI s3://checkpoints" > > > > Then I stop the job with a savepoint at s3://savepoint-1. > > > > I assume that all the data within the checkpoint are stored within the > given Savepoint. Is that assumption correct? > > > > Then, how can I fix the state in operator with UID Y, but keep all the > data in the other operators intact? > > > > I know how to bootstrap state with the state-processor API. > > I have not figured out how to correct / augment / fix the state though. > > > > Can somebody please explain? > >