Hi,

I have an original job (say v1) and I want to start a new job (say v2) from
a savepoint of v1.

An operator of v1 used to have per-key states of a POJO type, but I want to
remove the states together with the definition of the POJO type.

When I start v2 from a savepoint of v1, I specified
"--allowNonRestoredState" but  I got the following exception:

2021-02-08 22:07:28,324 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
input-to-idata (5/8) (168520e96e73dbf6c2e3c5342b324764) switched from
RUNNING to FAILED on container_e02_1607261469522_0242_01_000008 @
mobdata-flink-dn29.dakao.io (dataPort=45505).
java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222]
Caused by: org.apache.flink.util.FlinkException: Could not restore
keyed state backend for
CoBroadcastWithKeyedOperator_692eb9021f6319ecb59ea7c8901de92a_(5/8)
from any of the 1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException:
Caught unexpected exception.
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:361)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        ... 9 more
Caused by: java.io.IOException: Could not find class
'com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc' in
classpath.
        at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:756)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        ... 9 more
Caused by: java.lang.ClassNotFoundException:
com.kakaomobility.drivinghabit.stream.IDataConverter$SpeedAndLoc
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) 
~[?:1.8.0_222]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_222]
        at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_222]
        at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at java.lang.Class.forName0(Native Method) ~[?:1.8.0_222]
        at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_222]
        at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:754)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:731)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:214)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:129)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(NestedSerializersSnapshotDelegate.java:178)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.readSnapshot(CompositeTypeSerializerSnapshot.java:171)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:175)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:174)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:145)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:77)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:237)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:184)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:211)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:173)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:142)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:284)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.12.1.jar:1.12.1]
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.12.1.jar:1.12.1]

 ... 9 more


So I make two versions of v2:
1) v2_1 : remove only key states w/o removing the definition of the POJO
type. I can manage to resume from a savepoint of v1.
2) v2_2 : remove both key states and the definition of the POJO type. I
hope resuming from a savepoint of v2_1 could succeed but it fails with the
same exception as above.

Q1) Why doesn't the "--allowNonRestoredState" option suppress
ClassNotFoundException?
Q2) Do I have to live forever with the definition of the POJO type which is
no longer necessary?

Best,

Dongwon

Reply via email to