[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238833#comment-16238833 ]
ASF GitHub Bot commented on FLINK-6022: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924346 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -18,118 +18,111 @@ package org.apache.flink.formats.avro.typeutils; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoUtils; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Kryo; -import org.apache.avro.generic.GenericData; +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.objenesis.strategy.StdInstantiatorStrategy; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. + * A serializer that serializes types via Avro. * - * @param <T> The type serialized. + * <p>The serializer supports both efficient specific record serialization via for + * types generated via Avro, as well as serialization via reflection + * (ReflectDatumReader / -Writer). The serializer instantiated the types depending on --- End diff -- nit: "instantiated" => "instantiates" > Improve support for Avro GenericRecord > -------------------------------------- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System > Reporter: Robert Metzger > Priority: Major > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)