[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444068#comment-16444068 ]
ASF GitHub Bot commented on FLINK-8836: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5880#discussion_r182750453 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java --- @@ -140,14 +140,37 @@ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer<T> toCopy) { - defaultSerializers = toCopy.defaultSerializers; - defaultSerializerClasses = toCopy.defaultSerializerClasses; - kryoRegistrations = toCopy.kryoRegistrations; + this.type = checkNotNull(toCopy.type, "Type class cannot be null."); + this.defaultSerializerClasses = toCopy.defaultSerializerClasses; + this.defaultSerializers = new LinkedHashMap<>(toCopy.defaultSerializers.size()); + this.kryoRegistrations = new LinkedHashMap<>(toCopy.kryoRegistrations.size()); + + // deep copy the serializer instances in defaultSerializers + for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry : + toCopy.defaultSerializers.entrySet()) { - type = toCopy.type; - if(type == null){ - throw new NullPointerException("Type class cannot be null."); + this.defaultSerializers.put(entry.getKey(), deepCopySerializer(entry.getValue())); + } + + // deep copy the serializer instances in kryoRegistrations + for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) { --- End diff -- One alternative approach to this loop (though I'm not sure would be better), is in the `buildKryoRegistrationsMethod` we always make a copy of the `ExecutionConfig.SerializableSerializer` when instantiating its corresponding `KryoRegistration`. See https://github.com/apache/flink/blob/be7c89596a3b9cd8805a90aaf32336ec2759a1f7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L537. Here we can make a copy already when building the registrations. Then, when duplicating the `KryoSerializer`, for duplicating the registrations, this would only be a matter of calling `buildKryoRegistrations` again with the execution config because that method would handle stateful serializer registrations properly. IMO, this seems like a cleaner solution. What do you think? > Duplicating a KryoSerializer does not duplicate registered default serializers > ------------------------------------------------------------------------------ > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System > Reporter: Tzu-Li (Gordon) Tai > Assignee: Stefan Richter > Priority: Blocker > Fix For: 1.5.0 > > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer<T> duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer<T> toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)