Hi Flink community: Our Flink application sends different types of protobuf messages on-the-wire. Since protobuf cannot be handled by Flink type serializer, we had to register custom Kyro serializer:
env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, ProtobufSerializer.class); We found registering each protobuf class is not a viable solution for schema evolution. Particularly, when adding/removing new messages we would encounter errors when restoring state backend: Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 5 common frames omitted Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246) ... 7 common frames omitted Caused by: java.lang.IllegalStateException: Missing value for the key 'proto$*MyProtoClass2*' at org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) We then switch to registering the default protobuf class for the super class of all proto -- Message.class , and this issue appears to go away. see.getConfig().*addDefaultKryoSerializer*(Message.class, ProtobufSerializer .class); Questions: 1) It seems custom Kyro serializers are registered with the Flink state backend. Can we confirm when using the default Kyro serializer, only the super class (e.g Message.class) is registered and no specific protobuf message is associated with state ? 2) Will proto ser/de supported by Flink type serializer in the future and is there any longer term roadmap for supporting state evolution for protobuf-type messages? Thanks a lot.