Hello!

I am using Beam 2.30.0 with Flink runner 1.11.

The app gets input data as some POJOs, creates fixed windows of such, and
then writes window files to AWS S3.

As the data evolves, I want to add new fields to my POJOs (which are
actually avro-generated ones), and, given the new POJO is fully compatible
with the old one, I would expect the state to be successfully restored once
I deploy changed application code.

I am looking for code examples where such case is solved, and couldn't find
any. Couldn't find any section discussing evolution of stateful processors
in the doc either.

I tried multiple methods so far:
- not specifying any coders (as my avro-generated POJOs are already
Serializable)
- using KryoCoder (mentioned in
https://beam.apache.org/documentation/sdks/java/euphoria/)
- using AvroCoder (
https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/coders/AvroCoder.html
)
- implementing my own (see below)

public class MyPoJoCoder extends CustomCoder<MyPoJo> {
    @Override
    public void encode(MyPoJo value, OutputStream outStream) throws
CoderException, IOException {
        ObjectOutputStream oos = new ObjectOutputStream(outStream);
        value.writeExternal(oos);
    }

    @Override
    public MyPoJo decode(InputStream inStream) throws CoderException,
IOException {
        MyPoJo value = new MyPoJo();
        ObjectInputStream ois = new ObjectInputStream(inStream);
        value.readExternal(ois);
        return value;
    }
}

but still getting exception:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:204)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.initializeState(AbstractStreamOperator.java:247)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain
.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask
.lambda$beforeInvoke$0(StreamTask.java:474)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:470)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:529)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for DoFnOperator_3ae0e54ab75888aa89ffd50dca3abb31_(1/1) from
any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.operatorStateBackend(
StreamTaskStateInitializerImpl.java:265)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.streamOperatorStateContext(
StreamTaskStateInitializerImpl.java:152)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder
.build(DefaultOperatorStateBackendBuilder.java:86)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend
.createOperatorStateBackend(RocksDBStateBackend.java:552)
    at org.apache.flink.streaming.api.operators.
StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(
StreamTaskStateInitializerImpl.java:256)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.IllegalStateException: Could not Java-deserialize
TypeSerializer while restoring checkpoint metadata for serializer snapshot
'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$LegacySnapshot'.
Please update to the TypeSerializerSnapshot interface that removes Java
Serialization to avoid this problem in the future.
    at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot
.restoreSerializer(TypeSerializerConfigSnapshot.java:138)
    at org.apache.flink.runtime.state.StateSerializerProvider
.previousSchemaSerializer(StateSerializerProvider.java:189)
    at org.apache.flink.runtime.state.StateSerializerProvider
.currentSchemaSerializer(StateSerializerProvider.java:164)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo
.getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113
)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(
OperatorStateRestoreOperation.java:94)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder
.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more
Caused by: java.io.InvalidClassException: com.mymodels.MyPoJo; local class
incompatible: stream classdesc serialVersionUID = -4766138050980652522,
local class serialVersionUID = -3493429883367292394
    at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass
.java:689)
    at java.base/java.io.ObjectInputStream.readNonProxyDesc(
ObjectInputStream.java:2012)
    at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream
.java:1862)
    at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java:
1825)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
.java:1650)
    at java.base/java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream
.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
.java:1679)
    at java.base/java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream
.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
.java:1679)
    at java.base/java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream
.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
.java:1679)
    at java.base/java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream
.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
.java:1679)
    at java.base/java.io.ObjectInputStream.defaultReadFields(
ObjectInputStream.java:2464)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream
.java:2358)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(
ObjectInputStream.java:2196)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream
.java:1679)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream
.java:493)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream
.java:451)
    at org.apache.flink.api.common.typeutils.
TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(
TypeSerializerSerializationUtil.java:301)
    at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
    at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot
.readSnapshot(TypeSerializerConfigSnapshot.java:113)
    at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot
.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
    at org.apache.flink.api.common.typeutils.
TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy
.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
    at org.apache.flink.api.common.typeutils.
TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy
.read(TypeSerializerSnapshotSerializationUtil.java:150)
    at org.apache.flink.api.common.typeutils.
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
TypeSerializerSnapshotSerializationUtil.java:76)
    at org.apache.flink.runtime.state.metainfo.
StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl
.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
    at org.apache.flink.runtime.state.OperatorBackendSerializationProxy
.read(OperatorBackendSerializationProxy.java:119)
    at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(
OperatorStateRestoreOperation.java:83)
    ... 16 more

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>

Reply via email to