(adding back user mailing list)

Yes, that is correct. Flink 1.8.0 is causing the problem here.

1. Upgrade Flink to 1.11.1 without upgrading the connector
2. Take a new savepoint
3. Upgrade connector to the universal one
4. Restore upgraded job from the new savepoint (2)

If it doesn't work, please let us know.

Piotrek

śr., 12 sie 2020 o 09:53 Nikola Hrusov <n.hru...@gmail.com> napisał(a):

> Hi Piotr,
>
> We are running 1.8.0 and we updated it to 1.11.1. Apart from the flink
> version, we updated the scala version from 2.11 to 2.12 and the kafka
> connector. Those are the only changes done to the job.
> When I try to restart the job from a savepoint then I get this error.
>
> If I understand correctly, we should first upgrade to 1.11.1 (without
> upgrading the kafka connectors) and then, after we run 1.11.1, we should
> upgrade the kafka connectors?
>
> Regards
> ,
> Nikola Hrusov
>
> On Wed, Aug 12, 2020 at 11:49 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi Nikola,
>>
>> Which Flink version are you using? Can you describe step by step what you
>> are doing?
>>
>> This error that you have should have been fixed in Flink 1.9.0+ [1], so
>> if you are using an older version of Flink, please first upgrade Flink -
>> without upgrading the job, then upgrade the connector.
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-11249
>>
>> pon., 10 sie 2020 o 08:14 Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> napisał(a):
>>
>>> Hi Nikola,
>>>
>>> If I remember correctly, state is not compatible between
>>> flink-connector-kafka-0.11 and the universal flink-connector-kafka.
>>> Piotr (cc'ed) would probably know whats going on here.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On Mon, Aug 10, 2020 at 1:07 PM Nikola Hrusov <n.hru...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> We are trying to update our kafka connector dependency. So far we have
>>>> been using flink-connector-kafka-0.11 and we would like to update the
>>>> dependency to flink-connector-kafka.
>>>> However, when I try to restart the job with a savepoint I get the
>>>> following exception:
>>>>
>>>> java.lang.Exception: Exception while creating
>>>> StreamOperatorStateContext.
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>> operator state backend for StreamSink_351727121bb1ca0d704092960989d25b
>>>> _(1/10) from any of the 1 provided restore options.
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
>>>> ... 5 more
>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>> Failed when trying to restore operator state backend
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>>> ... 7 more
>>>> Caused by: java.io.IOException: Could not find class
>>>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$NextTransactionalIdHint'
>>>> in classpath.
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:711)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:681)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:178)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:122)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:125)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:170)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>>>> at
>>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
>>>> ... 11 more
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
>>>> $NextTransactionalIdHint
>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> at
>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Class.java:348)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:708)
>>>> ... 23 more
>>>>
>>>>
>>>> It seems like the state is saved including the classes. Is it possible
>>>> to do a migration in some way where we can update the dependency and keep
>>>> the state?
>>>>
>>>> Regards
>>>> ,
>>>> Nikola Hrusov
>>>>
>>>

Reply via email to