Hello! I am using Beam 2.30.0 with Flink runner 1.11.
The app gets input data as some POJOs, creates fixed windows of such, and then writes window files to AWS S3. As the data evolves, I want to add new fields to my POJOs (which are actually avro-generated ones), and, given the new POJO is fully compatible with the old one, I would expect the state to be successfully restored once I deploy changed application code. I am looking for code examples where such case is solved, and couldn't find any. Couldn't find any section discussing evolution of stateful processors in the doc either. I tried multiple methods so far: - not specifying any coders (as my avro-generated POJOs are already Serializable) - using KryoCoder (mentioned in https://beam.apache.org/documentation/sdks/java/euphoria/) - using AvroCoder ( https://beam.apache.org/releases/javadoc/2.30.0/org/apache/beam/sdk/coders/AvroCoder.html ) - implementing my own (see below) public class MyPoJoCoder extends CustomCoder<MyPoJo> { @Override public void encode(MyPoJo value, OutputStream outStream) throws CoderException, IOException { ObjectOutputStream oos = new ObjectOutputStream(outStream); value.writeExternal(oos); } @Override public MyPoJo decode(InputStream inStream) throws CoderException, IOException { MyPoJo value = new MyPoJo(); ObjectInputStream ois = new ObjectInputStream(inStream); value.readExternal(ois); return value; } } but still getting exception: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.streamOperatorStateContext( StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain .initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask .lambda$beforeInvoke$0(StreamTask.java:474) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 .runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke( StreamTask.java:470) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:529) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for DoFnOperator_3ae0e54ab75888aa89ffd50dca3abb31_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.operatorStateBackend( StreamTaskStateInitializerImpl.java:265) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.streamOperatorStateContext( StreamTaskStateInitializerImpl.java:152) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder .build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend .createOperatorStateBackend(RocksDBStateBackend.java:552) at org.apache.flink.streaming.api.operators. StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0( StreamTaskStateInitializerImpl.java:256) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.lang.IllegalStateException: Could not Java-deserialize TypeSerializer while restoring checkpoint metadata for serializer snapshot 'org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$LegacySnapshot'. Please update to the TypeSerializerSnapshot interface that removes Java Serialization to avoid this problem in the future. at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot .restoreSerializer(TypeSerializerConfigSnapshot.java:138) at org.apache.flink.runtime.state.StateSerializerProvider .previousSchemaSerializer(StateSerializerProvider.java:189) at org.apache.flink.runtime.state.StateSerializerProvider .currentSchemaSerializer(StateSerializerProvider.java:164) at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo .getPartitionStateSerializer(RegisteredOperatorStateBackendMetaInfo.java:113 ) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore( OperatorStateRestoreOperation.java:94) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder .build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more Caused by: java.io.InvalidClassException: com.mymodels.MyPoJo; local class incompatible: stream classdesc serialVersionUID = -4766138050980652522, local class serialVersionUID = -3493429883367292394 at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass .java:689) at java.base/java.io.ObjectInputStream.readNonProxyDesc( ObjectInputStream.java:2012) at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream .java:1862) at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java: 1825) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream .java:1650) at java.base/java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:2464) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream .java:2358) at java.base/java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:2196) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream .java:1679) at java.base/java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:2464) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream .java:2358) at java.base/java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:2196) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream .java:1679) at java.base/java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:2464) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream .java:2358) at java.base/java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:2196) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream .java:1679) at java.base/java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:2464) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream .java:2358) at java.base/java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:2196) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream .java:1679) at java.base/java.io.ObjectInputStream.defaultReadFields( ObjectInputStream.java:2464) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream .java:2358) at java.base/java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:2196) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream .java:1679) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream .java:493) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream .java:451) at org.apache.flink.api.common.typeutils. TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read( TypeSerializerSerializationUtil.java:301) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil .tryReadSerializer(TypeSerializerSerializationUtil.java:116) at org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot .readSnapshot(TypeSerializerConfigSnapshot.java:113) at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot .readVersionedSnapshot(TypeSerializerSnapshot.java:174) at org.apache.flink.api.common.typeutils. TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy .deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) at org.apache.flink.api.common.typeutils. TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy .read(TypeSerializerSnapshotSerializationUtil.java:150) at org.apache.flink.api.common.typeutils. TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot( TypeSerializerSnapshotSerializationUtil.java:76) at org.apache.flink.runtime.state.metainfo. StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl .readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy .read(OperatorBackendSerializationProxy.java:119) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore( OperatorStateRestoreOperation.java:83) ... 16 more Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin>