Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148977758 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // ------------------------------------------------------------------------ + // Copying + // ------------------------------------------------------------------------ + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader<T>(type); - this.writer = new ReflectDatumWriter<T>(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // ------------------------------------------------------------------------ + // Compatibility and Upgrades + // ------------------------------------------------------------------------ + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else { + return CompatibilityResult.requiresMigration(); } } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public TypeSerializer<T> duplicate() { + return new AvroSerializer<>(type); + } @Override public int hashCode() { - return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); + return 42 + type.hashCode(); } @Override public boolean equals(Object obj) { - if (obj instanceof AvroSerializer) { - @SuppressWarnings("unchecked") - AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj; - - return avroSerializer.canEqual(this) && - type == avroSerializer.type && - typeToInstantiate == avroSerializer.typeToInstantiate; - } else { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == AvroSerializer.class) { + final AvroSerializer that = (AvroSerializer) obj; + return this.type == that.type; + } + else { return false; } } @Override public boolean canEqual(Object obj) { - return obj instanceof AvroSerializer; + return obj.getClass() == this.getClass(); } - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - @Override - public AvroSerializerConfigSnapshot<T> snapshotConfiguration() { - return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); + public String toString() { + return getClass().getName() + " (" + getType().getName() + ')'; } - @SuppressWarnings("unchecked") - @Override - public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof AvroSerializerConfigSnapshot) { - final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot; + // ------------------------------------------------------------------------ + // Initialization + // ------------------------------------------------------------------------ + + private void checkAvroInitialized() { + if (writer == null) { + initializeAvro(); + } + } + + private void initializeAvro() { + final ClassLoader cl = Thread.currentThread().getContextClassLoader(); --- End diff -- This is the pattern used by the Kryo serializer for a while rather than `type.getClass().getClassLoader()`, because `type` could in theory be a collection class (application class loader) containing the actual type from another class loader. Might not be possible to happen for Avro, through, not totally sure...
---