[ https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238832#comment-16238832 ]
ASF GitHub Bot commented on FLINK-6022: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4943#discussion_r148924463 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -153,111 +146,215 @@ public T deserialize(T reuse, DataInputView source) throws IOException { return this.reader.read(reuse, this.decoder); } + // ------------------------------------------------------------------------ + // Copying + // ------------------------------------------------------------------------ + @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { + public T copy(T from) { checkAvroInitialized(); + return avroData.deepCopy(schema, from); + } - if (this.deepCopyInstance == null) { - this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class); - } - - this.decoder.setIn(source); - this.encoder.setOut(target); + @Override + public T copy(T from, T reuse) { + return copy(from); + } - T tmp = this.reader.read(this.deepCopyInstance, this.decoder); - this.writer.write(tmp, this.encoder); + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + T value = deserialize(source); + serialize(value, target); } - private void checkAvroInitialized() { - if (this.reader == null) { - this.reader = new ReflectDatumReader<T>(type); - this.writer = new ReflectDatumWriter<T>(type); - this.encoder = new DataOutputEncoder(); - this.decoder = new DataInputDecoder(); + // ------------------------------------------------------------------------ + // Compatibility and Upgrades + // ------------------------------------------------------------------------ + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + if (configSnapshot == null) { + checkAvroInitialized(); + configSnapshot = new AvroSchemaSerializerConfigSnapshot(schema.toString(false)); } + return configSnapshot; } - private void checkKryoInitialized() { - if (this.kryo == null) { - this.kryo = new Kryo(); - - Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy(); - instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - kryo.setInstantiatorStrategy(instantiatorStrategy); + @Override + @SuppressWarnings("deprecation") + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot) { + // proper schema snapshot, can do the sophisticated schema-based compatibility check + final String schemaString = ((AvroSchemaSerializerConfigSnapshot) configSnapshot).getSchemaString(); + final Schema lastSchema = new Schema.Parser().parse(schemaString); - kryo.setAsmEnabled(true); + final SchemaPairCompatibility compatibility = + SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema); - KryoUtils.applyRegistrations(kryo, kryoRegistrations.values()); + return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { + // old snapshot case, just compare the type + // we don't need to restore any Kryo stuff, since Kryo was never used for persistence, + // only for object-to-object copies. + final AvroSerializerConfigSnapshot old = (AvroSerializerConfigSnapshot) configSnapshot; + return type.equals(old.getTypeClass()) ? + CompatibilityResult.compatible() : CompatibilityResult.requiresMigration(); + } + else { + return CompatibilityResult.requiresMigration(); } } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + @Override + public TypeSerializer<T> duplicate() { + return new AvroSerializer<>(type); + } @Override public int hashCode() { - return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode(); + return 42 + type.hashCode(); } @Override public boolean equals(Object obj) { - if (obj instanceof AvroSerializer) { - @SuppressWarnings("unchecked") - AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj; - - return avroSerializer.canEqual(this) && - type == avroSerializer.type && - typeToInstantiate == avroSerializer.typeToInstantiate; - } else { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == AvroSerializer.class) { + final AvroSerializer that = (AvroSerializer) obj; + return this.type == that.type; + } + else { return false; } } @Override public boolean canEqual(Object obj) { - return obj instanceof AvroSerializer; + return obj.getClass() == this.getClass(); } - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - @Override - public AvroSerializerConfigSnapshot<T> snapshotConfiguration() { - return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations); + public String toString() { + return getClass().getName() + " (" + getType().getName() + ')'; } - @SuppressWarnings("unchecked") - @Override - public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof AvroSerializerConfigSnapshot) { - final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot; + // ------------------------------------------------------------------------ + // Initialization + // ------------------------------------------------------------------------ + + private void checkAvroInitialized() { + if (writer == null) { + initializeAvro(); + } + } + + private void initializeAvro() { + final ClassLoader cl = Thread.currentThread().getContextClassLoader(); + + if (SpecificRecord.class.isAssignableFrom(type)) { + this.avroData = new SpecificData(cl); + this.schema = this.avroData.getSchema(type); + this.reader = new SpecificDatumReader<>(schema, schema, avroData); + this.writer = new SpecificDatumWriter<>(schema, avroData); + } + else { + final ReflectData reflectData = new ReflectData(cl); + this.avroData = reflectData; + this.schema = this.avroData.getSchema(type); + this.reader = new ReflectDatumReader<>(schema, schema, reflectData); + this.writer = new ReflectDatumWriter<>(schema, reflectData); + } + + this.encoder = new DataOutputEncoder(); + this.decoder = new DataInputDecoder(); + } + + // ------------------------------------------------------------------------ + // Serializer Snapshots + // ------------------------------------------------------------------------ + + /** + * A config snapshot for the Avro Serializer that stores the Avro Schema to check compatibility. + */ + public static final class AvroSchemaSerializerConfigSnapshot extends TypeSerializerConfigSnapshot { + + private String schemaString; + + /** + * Default constructor for instantiation via reflection. + */ + @SuppressWarnings("unused") + public AvroSchemaSerializerConfigSnapshot() {} + + public AvroSchemaSerializerConfigSnapshot(String schemaString) { + this.schemaString = checkNotNull(schemaString); + } + + public String getSchemaString() { + return schemaString; + } + + // --- Serialization --- + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + this.schemaString = in.readUTF(); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + out.writeUTF(schemaString); + } - if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) { - // resolve Kryo registrations; currently, since the Kryo registrations in Avro - // are fixed, there shouldn't be a problem with the resolution here. + // --- Version --- - LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations(); - oldRegistrations.putAll(kryoRegistrations); + @Override + public int getVersion() { + return 1; + } - for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) { - if (reconfiguredRegistrationEntry.getValue().isDummy()) { - return CompatibilityResult.requiresMigration(); - } - } + // --- Utils --- - this.kryoRegistrations = oldRegistrations; - return CompatibilityResult.compatible(); + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + else if (obj != null && obj.getClass() == AvroSchemaSerializerConfigSnapshot.class) { + final AvroSchemaSerializerConfigSnapshot that = (AvroSchemaSerializerConfigSnapshot) obj; + return this.schemaString.equals(that.schemaString); --- End diff -- Is the schema string guaranteed to be stable or can it happen that two different Avro versions generate schema strings that Avro thinks are compatible but are slightly different strings? > Improve support for Avro GenericRecord > -------------------------------------- > > Key: FLINK-6022 > URL: https://issues.apache.org/jira/browse/FLINK-6022 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System > Reporter: Robert Metzger > Priority: Major > > Currently, Flink is serializing the schema for each Avro GenericRecord in the > stream. > This leads to a lot of overhead over the wire/disk + high serialization costs. > Therefore, I'm proposing to improve the support for GenericRecord in Flink by > shipping the schema to each serializer through the AvroTypeInformation. > Then, we can only support GenericRecords with the same type per stream, but > the performance will be much better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)