Hi Yun: Thanks for the quick reply. Thanks for pointing to FLINK-11333 <https://issues.apache.org/jira/browse/FLINK-11333>, will take a look.
We are currently using Flink 1.8.0. To summarize the behavior: 1) In the first version of the Flink app, there is a protobuf class getting registered for Kyro: env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, ProtobufSerializer.class); 2) Then the flink app gets updated with a second protobuf class, say, MyProtoClass2.class. env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, ProtobufSerializer.class); env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass2.class, ProtobufSerializer.class); 3) Cancel the first version of Flink app; take a savepoint; and deploy the second version Flink app with the savepoint. Occasionally one would encounter the exception described above. So far we are able to work around this issue, by simply registering the default Kryo deserializer for Message.class <https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/Message> -- the super class for all the generated protobuf messages. *NOTE*: Above is a high-level example. In practice, our app is more complex, with a much larger number of protobuf classes. Messages may be added/deleted across different versions of the Flink app. On Wed, Aug 7, 2019 at 11:23 AM Yun Tang <myas...@live.com> wrote: > Hi Ying > > What version of Flink are you using and please more exception stack. > Moreover, what is the relationship between `MyProtoClass2` and > `MyProtoClass1`? As far as I know, registering the Message class should not > be the proper solution. > > For the 2nd question, you could refer to FLINK-11333 [1] for more > information. > > CC @Tzu-Li (Gordon) Tai<mailto:tzuli...@apache.org> as he might provide > more information about this. > > [1] https://issues.apache.org/jira/browse/FLINK-11333 > > Best > Yun Tang > > ________________________________ > From: Ying Xu <y...@lyft.com> > Sent: Thursday, August 8, 2019 1:51 > To: dev@flink.apache.org <dev@flink.apache.org> > Subject: Custom type serializer inside Flink state > > Hi Flink community: > > Our Flink application sends different types of protobuf messages > on-the-wire. Since protobuf cannot be handled by Flink type serializer, we > had to register custom Kyro serializer: > > env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class, > ProtobufSerializer.class); > > We found registering each protobuf class is not a viable solution for > schema evolution. Particularly, when adding/removing new messages we would > encounter errors when restoring state backend: > > > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32) > from any of the 1 provided restore options. > at > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ... 5 common frames omitted > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore operator state backend > at > > org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) > at > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504) > at > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246) > ... 7 common frames omitted > Caused by: java.lang.IllegalStateException: Missing value for the key > 'proto$*MyProtoClass2*' > at > > org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190) > > We then switch to registering the default protobuf class for the super > class of all proto -- Message.class , and this issue appears to go away. > > see.getConfig().*addDefaultKryoSerializer*(Message.class, > ProtobufSerializer > .class); > > > Questions: > > 1) It seems custom Kyro serializers are registered with the Flink state > backend. Can we confirm when using the default Kyro serializer, only the > super class (e.g Message.class) is registered and no specific protobuf > message is associated with state ? > > 2) Will proto ser/de supported by Flink type serializer in the future and > is > there any longer term roadmap for supporting state evolution for > protobuf-type messages? > > Thanks a lot. >