[ https://issues.apache.org/jira/browse/FLINK-1391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14288922#comment-14288922 ]
ASF GitHub Bot commented on FLINK-1391: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/323#discussion_r23435527 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java --- @@ -237,6 +244,25 @@ private void checkKryoInitialized() { // Throwable and all subclasses should be serialized via java serialization kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); + // If the type we have to serialize as a GenricType is implementing SpecificRecordBase, + // we have to register the avro serializer + // This rule only applies if users explicitly use the GenericTypeInformation for the avro types + // usually, we are able to handle Avro POJOs with the POJO serializer. + if(SpecificRecordBase.class.isAssignableFrom(type)) { + ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(type); + this.kryo.register(type, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); + + } + // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type + // because Kryo is not able to serialize them properly, we use this serializer for them + this.kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer(ArrayList.class)); --- End diff -- Should we make this registration conditional so that it only happens when we have encountered an Avro type? > Kryo fails to properly serialize avro collection types > ------------------------------------------------------ > > Key: FLINK-1391 > URL: https://issues.apache.org/jira/browse/FLINK-1391 > Project: Flink > Issue Type: Improvement > Affects Versions: 0.8, 0.9 > Reporter: Robert Metzger > Assignee: Robert Metzger > > Before FLINK-610, Avro was the default generic serializer. > Now, special types coming from Avro are handled by Kryo .. which seems to > cause errors like: > {code} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: > java.lang.NullPointerException > at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:143) > at > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:148) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:244) > at > org.apache.flink.runtime.plugable.DeserializationDelegate.read(DeserializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.serialization.AdaptiveSpanningRecordDeserializer.getNextRecord(AdaptiveSpanningRecordDeserializer.java:71) > at > org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:189) > at > org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176) > at > org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) > at java.lang.Thread.run(Thread.java:744) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)