Hi Flink community:

Our Flink application sends different types of protobuf messages
on-the-wire.  Since protobuf cannot be handled by Flink type serializer, we
had to register custom Kyro serializer:

env.getConfig().*registerTypeWithKryoSerializer*(MyProtoClass1.class,
ProtobufSerializer.class);

We found registering each protobuf class is not a viable solution for
schema evolution.  Particularly, when adding/removing new messages we would
encounter errors when restoring state backend:


Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_d63019dde9166d2672f543f01f344085_(13/32)
from any of the 1 provided restore options.
        at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        ... 5 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
        at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createOperatorStateBackend(FsStateBackend.java:504)
        at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
        ... 7 common frames omitted
Caused by: java.lang.IllegalStateException: Missing value for the key
'proto$*MyProtoClass2*'
        at
org.apache.flink.util.LinkedOptionalMap.unwrapOptionals(LinkedOptionalMap.java:190)

We then switch to registering the default protobuf class for the super
class of all proto -- Message.class , and this issue appears to go away.

see.getConfig().*addDefaultKryoSerializer*(Message.class, ProtobufSerializer
.class);


Questions:

1)  It seems custom Kyro serializers are registered with the Flink state
backend. Can we confirm when using the default Kyro serializer, only the
super class (e.g Message.class) is registered and no specific protobuf
message is associated with state ?

2)  Will proto ser/de supported by Flink type serializer in the future and is
there any longer term roadmap for supporting state evolution for
protobuf-type messages?

Thanks a lot.

Reply via email to