Hi Khachatryan, Thanks for the explanation and the input!
1. Use the State Processor API to create a new snapshot [1] I haven't used it. but does the API prevent the class of a specific serializer from being loaded? 2. If the operator has only this state then changing uid (together with > allowNonRestoredState) should help Very unfortunately, I have another per-key state defined on the operator which is very important and cannot be abandoned T.T 3. Probably just changing POJO to an empty class will suffice in your case? Yeah, I might be bringing the class definition for a while. Best, Dongwon On Tue, Feb 9, 2021 at 2:35 AM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi, > > I'm pulling Yun Tang who is familiar with StateBackends and RocksDB in > particular. > > From what I see, the 2nd snapshot (sp2) is built using the same set of > states obtained from the starting savepoint/checkpoint (sp1) to write its > metadata. This metadata includes serializers snapshots, including > PojoSerializer for your custom type. On restore, this metadata is read, and > POJO class itself is loaded. > > I see the following ways to overcome this issue: > 1. Use the State Processor API to create a new snapshot [1] > 2. If the operator has only this state then changing uid (together with > allowNonRestoredState) should help > 3. Probably just changing POJO to an empty class will suffice in your case? > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > Regards, > Roman > > > On Mon, Feb 8, 2021 at 4:31 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hi 张静, >> >> Q1: By default, a savepoint restore will try to match all state >>> back to the restored job. `AllowNonRestoredState` cannot avoid >>> recovery all state from savepoint, but only skip match all of the >>> restore state back to the restored job. So `ClassNotFoundException ` >>> could not be avoid. >> >> okay >> >> Q2: Not really. After you recover new job from the savepoint >>> (savepoint1)based on (1), you could do a new savepoint (savepoint2), >>> then remove the definition of the POJO type. then you can restore from >>> savepoint2. >>> >> I did it but it ends up with the same ClassNotFoundException :-( >> >> What I did exactly are >> (1) Trigger sp1 from v1 >> (2) Start v2-1 (w/ the definition of the POJO but do not use it at all) >> from sp1 >> (3) Trigger sp2 from v2-1 >> (4) Start v2-2 (w/o the definition of the POJO) from sp2 >> (5) v2-2 failed with the same ClassNotFoundException regarding the POJO >> type >> >> Should v2-2 successfully start from sp2? >> >> Best, >> >> Dongwon >> >> >> >> >> >> >> On Mon, Feb 8, 2021 at 11:48 PM 张静 <beyond1...@gmail.com> wrote: >> >>> Hi, Dongwon, >>> Q1: By default, a savepoint restore will try to match all state >>> back to the restored job. `AllowNonRestoredState` cannot avoid >>> recovery all state from savepoint, but only skip match all of the >>> restore state back to the restored job. So `ClassNotFoundException ` >>> could not be avoid. >>> Q2: Not really. After you recover new job from the savepoint >>> (savepoint1)based on (1), you could do a new savepoint (savepoint2), >>> then remove the definition of the POJO type. then you can restore from >>> savepoint2. >>> Correct me please if I'm wrong. Thanks. >>> >>> Best, >>> Beyond1920 >>> >>> Dongwon Kim <eastcirc...@gmail.com> 于2021年2月8日周一 下午9:43写道: >>> > >>> > Hi, >>> > >>> > I have an original job (say v1) and I want to start a new job (say v2) >>> from a savepoint of v1. >>> > >>> > An operator of v1 used to have per-key states of a POJO type, but I >>> want to remove the states together with the definition of the POJO type. >>> > >>> > When I start v2 from a savepoint of v1, I specified >>> "--allowNonRestoredState" but I got the following exception: >>> > >>> > 2021-02-08 22:07:28,324 INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >>> input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from >>> RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @ >>> mobdata-flink-dn29.dakao.io (dataPort=45505). >>> > java.lang.Exception: Exception while creating >>> StreamOperatorStateContext. >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222] >>> > Caused by: org.apache.flink.util.FlinkException: Could not restore >>> keyed state backend for >>> CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8) from >>> any of the 1 provided restore options. >>> > at >>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > ... 9 more >>> > Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>> Caught unexpected exception. >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > ... 9 more >>> > Caused by: java.io.IOException: Could not find class >>> 'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in >>> classpath. >>> > at >>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > ... 9 more >>> > Caused by: java.lang.ClassNotFoundException: >>> com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc >>> > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) >>> ~[?:1.8.0_222] >>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222] >>> > at >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222] >>> > at >>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222] >>> > at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222] >>> > at >>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > at >>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >>> ~[flink-dist_2.11-1.12.1.jar:1.12.1] >>> > >>> > ... 9 more >>> > >>> > >>> > So I make two versions of v2: >>> > 1) v2_1 : remove only key states w/o removing the definition of the >>> POJO type. I can manage to resume from a savepoint of v1. >>> > 2) v2_2 : remove both key states and the definition of the POJO type. >>> I hope resuming from a savepoint of v2_1 could succeed but it fails with >>> the same exception as above. >>> > >>> > Q1) Why doesn't the "--allowNonRestoredState" option suppress >>> ClassNotFoundException? >>> > Q2) Do I have to live forever with the definition of the POJO type >>> which is no longer necessary? >>> > >>> > Best, >>> > >>> > Dongwon >>> > >>> >>