Tzu-Li (Gordon) Tai created FLINK-11073:
-------------------------------------------
Summary: Make serializers immutable / provide option
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer
Key: FLINK-11073
URL: https://issues.apache.org/jira/browse/FLINK-11073
Project: Flink
Issue Type: Improvement
Components: Type Serialization System
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Fix For: 1.8.0
h2. Motivation
Right now, when a new serializer is provided to the old serializer (or, to be
more specific, the old serializer's snapshot) for state schema compatibility
checks, if the new serializer is reconfigurable so that it may be compatible,
the only possible way to do this is reconfigure the new serializer in-place and
return {{TypeSerializerSchemaCompatibility.compatibleAsIs()}} as the result of
the compatibility check.
One solid example is the {{KryoSerializer}}. The {{KryoSerializer}} contains as
configuration a map of serialized classes to their registered ids. This mapping
may change on restore executions, and the new {{KryoSerializer}} must
reconfigure this mapping to match with the previous execution before the new
{{KryoSerializer}} can be used for state access.
Right now, this is performed by directly mutating the map in the new serializer
instance.
This mutative behaviour is fragile, especially when taking into account scale
down / up scenarios which could easily result in mismatching state serializer
configurations across TMs.
h2. Proposed Approach
# The {{TypeSerializerSchemaCompatibility}} result class should be extended to
contain an option {{compatibleWithReconfiguredSerializer(TypeSerializer)}},
which would wrap a new instance of a reconfigured version of the new serializer.
# Callers of the compatibility check needs to be aware of this case and
respect it, using the provided reconfigured serializer instance when one is
provided. In Flink, there are two places which performs compatibility checks on
serializers: 1) composite serializers which contain nested serializers, and
therefore needs to check compatibility of its nested serializers, and 2) in
state backends, checking the compatibility of the new serializer with the old
serializer.
# Introduce {{CompositeTypeSerializerSnapshot}} to encapsulate logic of
handling reconfiguration of nested serializers: if a composite serializer has a
nested serializer that returns a new reconfigured instance of itself, than the
result of the compatibility check on the composite serializer should also wrap
a reconfigured version of the composite serializer that holds the reconfigured
nested serializer. This logic should be captured in a base abstract class, say
{{CompositeTypeSerializerSnapshot}} so that it can be commonly shared by many
of Flink's composite serializers.
# For composite serializers that is still using the legacy, less-powerful
{{TypeSerializerConfigSnapshot}} and {{CompatibilityResult}} abstractions,
while its nested serializer is signaling that it has reconfigured itself, this
should be detected and an error is thrown complaining that the outer composite
serializer needs to be upgraded to use the new serializer snapshot and
compatibility abstractions. This approach follows the same way we handled
bridging the new {{TypeSerializerSchemaCompatibility}} and old
{{CompatibilityResult}} class in Flink 1.7.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)