Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924346 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -18,118 +18,111 @@ package org.apache.flink.formats.avro.typeutils; -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; -import org.apache.flink.api.java.typeutils.runtime.KryoUtils; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.formats.avro.utils.DataInputDecoder; import org.apache.flink.formats.avro.utils.DataOutputEncoder; import org.apache.flink.util.InstantiationUtil; -import org.apache.flink.util.Preconditions; -import com.esotericsoftware.kryo.Kryo; -import org.apache.avro.generic.GenericData; +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.util.Utf8; -import org.objenesis.strategy.StdInstantiatorStrategy; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; import java.io.IOException; -import java.io.ObjectInputStream; -import java.util.LinkedHashMap; -import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and - * Kryo for deep object copies. We want to change this to Kryo-only. + * A serializer that serializes types via Avro. * - * @param <T> The type serialized. + * <p>The serializer supports both efficient specific record serialization via for + * types generated via Avro, as well as serialization via reflection + * (ReflectDatumReader / -Writer). The serializer instantiated the types depending on --- End diff -- nit: "instantiated" => "instantiates"
---