Hi Nikola, If I remember correctly, state is not compatible between flink-connector-kafka-0.11 and the universal flink-connector-kafka. Piotr (cc'ed) would probably know whats going on here.
Cheers, Gordon On Mon, Aug 10, 2020 at 1:07 PM Nikola Hrusov <n.hru...@gmail.com> wrote: > Hello, > > We are trying to update our kafka connector dependency. So far we have > been using flink-connector-kafka-0.11 and we would like to update the > dependency to flink-connector-kafka. > However, when I try to restart the job with a savepoint I get the > following exception: > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore > operator state backend for StreamSink_351727121bb1ca0d704092960989d25b_(1/ > 10) from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) > ... 5 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: > Failed when trying to restore operator state backend > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > ... 7 more > Caused by: java.io.IOException: Could not find class > 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$NextTransactionalIdHint' > in classpath. > at > org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:711) > at > org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:681) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:178) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:122) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:125) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:170) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) > at > org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) > at > org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) > ... 11 more > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 > $NextTransactionalIdHint > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:708) > ... 23 more > > > It seems like the state is saved including the classes. Is it possible to > do a migration in some way where we can update the dependency and keep the > state? > > Regards > , > Nikola Hrusov >