[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444068#comment-16444068
 ] 

ASF GitHub Bot commented on FLINK-8836:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5880#discussion_r182750453
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
 ---
    @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig 
executionConfig){
         * Copy-constructor that does not copy transient fields. They will be 
initialized once required.
         */
        protected KryoSerializer(KryoSerializer<T> toCopy) {
    -           defaultSerializers = toCopy.defaultSerializers;
    -           defaultSerializerClasses = toCopy.defaultSerializerClasses;
     
    -           kryoRegistrations = toCopy.kryoRegistrations;
    +           this.type = checkNotNull(toCopy.type, "Type class cannot be 
null.");
    +           this.defaultSerializerClasses = toCopy.defaultSerializerClasses;
    +           this.defaultSerializers = new 
LinkedHashMap<>(toCopy.defaultSerializers.size());
    +           this.kryoRegistrations = new 
LinkedHashMap<>(toCopy.kryoRegistrations.size());
    +
    +           // deep copy the serializer instances in defaultSerializers
    +           for (Map.Entry<Class<?>, 
ExecutionConfig.SerializableSerializer<?>> entry :
    +                   toCopy.defaultSerializers.entrySet()) {
     
    -           type = toCopy.type;
    -           if(type == null){
    -                   throw new NullPointerException("Type class cannot be 
null.");
    +                   this.defaultSerializers.put(entry.getKey(), 
deepCopySerializer(entry.getValue()));
    +           }
    +
    +           // deep copy the serializer instances in kryoRegistrations
    +           for (Map.Entry<String, KryoRegistration> entry : 
toCopy.kryoRegistrations.entrySet()) {
    --- End diff --
    
    One alternative approach to this loop (though I'm not sure would be 
better), is in the `buildKryoRegistrationsMethod` we always make a copy of the 
`ExecutionConfig.SerializableSerializer` when instantiating its corresponding 
`KryoRegistration`.
    See 
https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537.
 Here we can make a copy already when building the registrations.
    
    Then, when duplicating the `KryoSerializer`, for duplicating the 
registrations, this would only be a matter of calling `buildKryoRegistrations` 
again with the execution config because that method would handle stateful 
serializer registrations properly.
    IMO, this seems like a cleaner solution. What do you think?


> Duplicating a KryoSerializer does not duplicate registered default serializers
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-8836
>                 URL: https://issues.apache.org/jira/browse/FLINK-8836
>             Project: Flink
>          Issue Type: Bug
>          Components: Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Stefan Richter
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer<T> duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer<T> toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to