Hi min,
Is the POJO state in an existed operator or a new added operator?
BTW, that would be great if you would like to give the code to reproduce
the exception. I need more debug to find out the reason based on the code.


Tan, Min <min....@ubs.com> 于2021年7月8日周四 上午2:56写道:

> Hi,
>
>
>
> I have followed the steps below in restarting a Flink job with newly
> modified savepoints.
>
>
>
> I can re start a job with new savepoints as long as the Flink states are
> expressed in Java primitives.
>
> When the flink states are expressed in a POJO, my job does not get
> restarted. I have the following exceptions.
>
>
>
> Any ideas? Do I need to redefine any serializers?
>
>
>
> Thank you very much for your help in advance.
>
> Regards,
>
>
>
> Min
>
>
>
>
>
> -----------------------------------------------------------------------------Flink
> exceptions-------------------------------------------------------------------
>
> 2021-07-07 19:02:58
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:191)
>
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:255)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .initializeStateAndOpen(StreamTask.java:989)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$0(StreamTask.java:453)
>
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:94)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:448)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:460)
>
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
>
>     at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> LegacyKeyedCoProcessOperator_086b0e2b116638fe57d2d20eb2517b22_(1/1) from
> any of the 1 provided restore options.
>
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
>
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:304)
>
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:131)
>
>     ... 9 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
>
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:116)
>
>     at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:529)
>
>     at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:288)
>
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
>     at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:121)
>
>     ... 11 more
>
> Caused by: java.lang.IllegalStateException: Missing value for the key
> 'org.apache.flink.state.api.output.TaggedOperatorSubtaskState'
>
>     at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(
> LinkedOptionalMap.java:190)
>
>     at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializerSnapshot.restoreSerializer(KryoSerializerSnapshot.java:86)
>
>     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline
> .java:193)
>
>     at java.util.Spliterators$ArraySpliterator.forEachRemaining(
> Spliterators.java:948)
>
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:
> 482)
>
>     at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline
> .java:472)
>
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:
> 546)
>
>     at java.util.stream.AbstractPipeline.evaluateToArrayNode(
> AbstractPipeline.java:260)
>
>     at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:
> 505)
>
>     at org.apache.flink.api.common.typeutils.
> NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(
> NestedSerializersSnapshotDelegate.java:225)
>
>     at org.apache.flink.api.common.typeutils.
> NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(
> NestedSerializersSnapshotDelegate.java:83)
>
>     at org.apache.flink.api.common.typeutils.
> CompositeTypeSerializerSnapshot.restoreSerializer(
> CompositeTypeSerializerSnapshot.java:194)
>
>     at org.apache.flink.runtime.state.StateSerializerProvider
> .previousSchemaSerializer(StateSerializerProvider.java:189)
>
>     at org.apache.flink.runtime.state.StateSerializerProvider
> .currentSchemaSerializer(StateSerializerProvider.java:164)
>
>     at org.apache.flink.runtime.state.
> RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(
> RegisteredKeyValueStateBackendMetaInfo.java:136)
>
>     at org.apache.flink.runtime.state.heap.StateTable.getStateSerializer(
> StateTable.java:315)
>
>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable
> .createStateMap(CopyOnWriteStateTable.java:54)
>
>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable
> .createStateMap(CopyOnWriteStateTable.java:36)
>
>     at org.apache.flink.runtime.state.heap.StateTable.<init>(StateTable
> .java:98)
>
>     at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.<init>(
> CopyOnWriteStateTable.java:49)
>
>     at org.apache.flink.runtime.state.heap.
> AsyncSnapshotStrategySynchronicityBehavior.newStateTable(
> AsyncSnapshotStrategySynchronicityBehavior.java:41)
>
>     at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy
> .newStateTable(HeapSnapshotStrategy.java:243)
>
>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .createOrCheckStateForMetaInfo(HeapRestoreOperation.java:185)
>
>     at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(
> HeapRestoreOperation.java:152)
>
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:114)
>
>     ... 15 more
>
>
>
>
>
>
>
> *From:* JING ZHANG <beyond1...@gmail.com>
> *Sent:* 29 June 2021 07:45
> *To:* Marco Villalobos <mvillalo...@kineteque.com>
> *Cc:* user <user@flink.apache.org>
> *Subject:* [External] Re: State Processor API and existing state
>
>
>
> Hi Marco,
>
> > I assume that all the data within the checkpoint are stored within the
> given Savepoint. Is that assumption correct?
>
> Yes
>
> > I have not figured out how to correct / augment / fix the state though.
> Can somebody please explain?
>
> Please try this way.
>
> 1. Load old savepoint file, create Savepoint obj1
>
> 2. Read state of operator with UID Y in returned Savepoint obj1 by step1
>
> 3. Create `BootstrapTransformation` based on entry point class
> `OperatorTransformation`, bootstrap new operator state with dataset
> returned by step2, correct or fix old state of operator UID Y in a
> `StateBotstrapFunction` or `KeyedStateBootstrapFunction`
>
> 4. Load old savepoint file, create Savepoint obj2
>
> 5. Drop the old operator with UID Y by calling `removeOperator` in
> returned Savepoint obj2 by step4
>
> 6. Add a new Operator with UID Y by calling `withOperator` in returned
> Savepoint obj2 by step4 , the first parameter is uid (Y), the second
> parameter is returned `BootstrapTranformation` by step 3.
>
> 7. writes out returned Savepoint obj2 by step7 to a new path
>
>
>
> In this way, in new savepoint files, states of operator withUIDs: W,X, Z
> are intact, only the state of operator Y is updated.
>
> Detailed about read/write/modify savepoint could be found in document[1]
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/libs/state_processor_api/
>
>
>
> Best regards,
>
> JING ZHANG
>
>
>
> Marco Villalobos <mvillalo...@kineteque.com> 于2021年6月29日周二 上午6:00写道:
>
> Let's say that a job has operators with UIDs: W, X, Y, and Z, and uses
> RocksDB as a backend with checkpoint data URI s3://checkpoints"
>
>
>
> Then I stop the job with a savepoint at s3://savepoint-1.
>
>
>
> I assume that all the data within the checkpoint are stored within the
> given Savepoint. Is that assumption correct?
>
>
>
> Then, how can I fix the state in operator with UID Y, but keep all the
> data in the other operators intact?
>
>
>
> I know how to bootstrap state with the state-processor API.
>
> I have not figured out how to correct / augment / fix the state though.
>
>
>
> Can somebody please explain?
>
>

Reply via email to