Tzu-Li (Gordon) Tai created FLINK-11073:
-------------------------------------------

             Summary: Make serializers immutable / provide option 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer
                 Key: FLINK-11073
                 URL: https://issues.apache.org/jira/browse/FLINK-11073
             Project: Flink
          Issue Type: Improvement
          Components: Type Serialization System
            Reporter: Tzu-Li (Gordon) Tai
            Assignee: Tzu-Li (Gordon) Tai
             Fix For: 1.8.0


h2. Motivation

Right now, when a new serializer is provided to the old serializer (or, to be 
more specific, the old serializer's snapshot) for state schema compatibility 
checks, if the new serializer is reconfigurable so that it may be compatible, 
the only possible way to do this is reconfigure the new serializer in-place and 
return {{TypeSerializerSchemaCompatibility.compatibleAsIs()}} as the result of 
the compatibility check.

One solid example is the {{KryoSerializer}}. The {{KryoSerializer}} contains as 
configuration a map of serialized classes to their registered ids. This mapping 
may change on restore executions, and the new {{KryoSerializer}} must 
reconfigure this mapping to match with the previous execution before the new 
{{KryoSerializer}} can be used for state access.
Right now, this is performed by directly mutating the map in the new serializer 
instance.

This mutative behaviour is fragile, especially when taking into account scale 
down / up scenarios which could easily result in mismatching state serializer 
configurations across TMs.
h2. Proposed Approach
 # The {{TypeSerializerSchemaCompatibility}} result class should be extended to 
contain an option {{compatibleWithReconfiguredSerializer(TypeSerializer)}}, 
which would wrap a new instance of a reconfigured version of the new serializer.
 # Callers of the compatibility check needs to be aware of this case and 
respect it, using the provided reconfigured serializer instance when one is 
provided. In Flink, there are two places which performs compatibility checks on 
serializers: 1) composite serializers which contain nested serializers, and 
therefore needs to check compatibility of its nested serializers, and 2) in 
state backends, checking the compatibility of the new serializer with the old 
serializer.
 # Introduce {{CompositeTypeSerializerSnapshot}} to encapsulate logic of 
handling reconfiguration of nested serializers: if a composite serializer has a 
nested serializer that returns a new reconfigured instance of itself, than the 
result of the compatibility check on the composite serializer should also wrap 
a reconfigured version of the composite serializer that holds the reconfigured 
nested serializer. This logic should be captured in a base abstract class, say 
{{CompositeTypeSerializerSnapshot}} so that it can be commonly shared by many 
of Flink's composite serializers.
 # For composite serializers that is still using the legacy, less-powerful 
{{TypeSerializerConfigSnapshot}} and {{CompatibilityResult}} abstractions, 
while its nested serializer is signaling that it has reconfigured itself, this 
should be detected and an error is thrown complaining that the outer composite 
serializer needs to be upgraded to use the new serializer snapshot and 
compatibility abstractions. This approach follows the same way we handled 
bridging the new {{TypeSerializerSchemaCompatibility}} and old 
{{CompatibilityResult}} class in Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to