tzulitai closed pull request #6151: [FLINK-9569] [avro] Fix confusing construction of GenericRecord AvroSerializers URL: https://github.com/apache/flink/pull/6151
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java index b313625bfe2..09bd5dd8ef7 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java @@ -18,6 +18,7 @@ package org.apache.flink.formats.avro.typeutils; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; @@ -46,6 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import static org.apache.flink.util.Preconditions.checkArgument; @@ -67,6 +69,7 @@ * * @param <T> The type to be serialized. */ +@Internal public class AvroSerializer<T> extends TypeSerializer<T> { private static final long serialVersionUID = 1L; @@ -105,41 +108,54 @@ /** The currently accessing thread, set and checked on debug level only. */ private transient volatile Thread currentThread; - // ------------------------------------------------------------------------ + // ----------------------- instantiation methods -------------------------- /** * Creates a new AvroSerializer for the type indicated by the given class. - * This constructor is intended to be used with {@link SpecificRecord} or reflection serializer. - * For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)} + * + * <p>This constructor is expected to be used only with {@link GenericRecord}. + * For {@link SpecificRecord} or reflection serializer use {@link AvroSerializer#forNonGeneric(Class)}. + * + * @param schema the explicit schema to use for generic records. */ - public AvroSerializer(Class<T> type) { - checkArgument(!isGenericRecord(type), - "For GenericData.Record use constructor with explicit schema."); - this.type = checkNotNull(type); - this.schemaString = null; + public static AvroSerializer<GenericRecord> forGeneric(Schema schema) { + return new AvroSerializer<>(GenericRecord.class, schema); } /** * Creates a new AvroSerializer for the type indicated by the given class. - * This constructor is expected to be used only with {@link GenericData.Record}. - * For {@link SpecificRecord} or reflection serializer use - * {@link AvroSerializer#AvroSerializer(Class)} + * + * <p>This instantiation method is intended to be used with {@link SpecificRecord} or reflection serializer. + * For serializing {@link GenericData.Record} use {@link AvroSerializer#forGeneric(Schema)}. + * + * @param type the type to be serialized. */ - public AvroSerializer(Class<T> type, Schema schema) { - checkArgument(isGenericRecord(type), - "For classes other than GenericData.Record use constructor without explicit schema."); - this.type = checkNotNull(type); - this.schema = checkNotNull(schema); - this.schemaString = schema.toString(); + public static <T> AvroSerializer<T> forNonGeneric(Class<T> type) { + checkArgument(!isGenericRecord(type), + "For generic records, use AvroSerializer.forGeneric(schema) to provide an explicit schema."); + + return new AvroSerializer<>(type, null); } + + // ------------------------------------------------------------------------ + /** - * @deprecated Use {@link AvroSerializer#AvroSerializer(Class)} instead. + * Private constructor. + * + * @param type the type to be serialized. + * @param schema the explicit schema to use. This is should be non-null only when + * the type to be serialized are {@link GenericRecord}s. */ - @Deprecated - @SuppressWarnings("unused") - public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) { - this(type); + private AvroSerializer(Class<T> type, @Nullable Schema schema) { + this.type = checkNotNull(type); + this.schema = schema; + + if (schema == null) { + this.schemaString = null; + } else { + this.schemaString = schema.toString(); + } } // ------------------------------------------------------------------------ @@ -311,12 +327,7 @@ private static boolean isGenericRecord(Class<?> type) { @Override public TypeSerializer<T> duplicate() { - if (schemaString != null) { - return new AvroSerializer<>(type, schema); - } else { - return new AvroSerializer<>(type); - - } + return new AvroSerializer<>(type, schema); } @Override diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java index 644ee50d361..ca5cf308a2d 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java @@ -82,7 +82,7 @@ public AvroTypeInfo(Class<T> typeClass, boolean useBackwardsCompatibleSerializer public TypeSerializer<T> createSerializer(ExecutionConfig config) { return useBackwardsCompatibleSerializer ? new BackwardsCompatibleAvroSerializer<>(getTypeClass()) : - new AvroSerializer<>(getTypeClass()); + AvroSerializer.forNonGeneric(getTypeClass()); } @SuppressWarnings("unchecked") diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java index e5eb5d89a1f..4080e280453 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java @@ -61,7 +61,7 @@ */ public BackwardsCompatibleAvroSerializer(Class<T> type) { this.type = type; - this.serializer = new AvroSerializer<>(type); + this.serializer = AvroSerializer.forNonGeneric(type); } /** diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java index 83d590acaf9..0df85529790 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java @@ -77,7 +77,7 @@ public boolean isKeyType() { @Override public TypeSerializer<GenericRecord> createSerializer(ExecutionConfig config) { - return new AvroSerializer<>(GenericRecord.class, schema); + return AvroSerializer.forGeneric(schema); } @Override diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java index 5744abc1657..e02f9bad646 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java @@ -70,7 +70,7 @@ public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegist @Override public <T> TypeSerializer<T> createAvroSerializer(Class<T> type) { - return new AvroSerializer<>(type); + return AvroSerializer.forNonGeneric(type); } @Override diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java index 89be9c06d33..16c594fd736 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java @@ -28,6 +28,6 @@ @Override protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) { - return new AvroSerializer<T>(type); + return AvroSerializer.forNonGeneric(type); } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java index a247766ae15..40811ddd24f 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java @@ -28,6 +28,6 @@ @Override protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new AvroSerializer<>(type); + return AvroSerializer.forNonGeneric(type); } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java index 1c1a19b324c..f8b5302f5da 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java @@ -28,6 +28,6 @@ @Override protected <T> TypeSerializer<T> createSerializer(Class<T> type) { - return new AvroSerializer<>(type); + return AvroSerializer.forNonGeneric(type); } } diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java index aaa9b4b08b7..5dcf6a770f4 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java @@ -40,7 +40,7 @@ @Test public void testConcurrentUseOfSerializer() throws Exception { - final AvroSerializer<String> serializer = new AvroSerializer<>(String.class); + final AvroSerializer<String> serializer = AvroSerializer.forNonGeneric(String.class); final BlockerSync sync = new BlockerSync(); diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java index bb3d911b3d3..354b78e79cd 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java @@ -37,7 +37,7 @@ public void testBookSerialization() { try { Book b = new Book(123, "This is a test book", 26382648); - AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class); + AvroSerializer<Book> serializer = AvroSerializer.forNonGeneric(Book.class); SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b); test.testAll(); } @@ -61,7 +61,7 @@ public void testSerialization() { a.books = books; a.bookType = BookAuthor.BookType.journal; - AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class); + AvroSerializer<BookAuthor> serializer = AvroSerializer.forNonGeneric(BookAuthor.class); SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a); test.testAll(); diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java index c15aa7c0141..65ca4103e31 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java @@ -40,7 +40,7 @@ @Test public void testDeserializeSerializer() throws Exception { - final AvroSerializer<String> currentSerializer = new AvroSerializer<>(String.class); + final AvroSerializer<String> currentSerializer = AvroSerializer.forNonGeneric(String.class); try (ObjectInputStream in = new ObjectInputStream( getClass().getClassLoader().getResourceAsStream(RESOURCE_NAME))) { @@ -57,7 +57,7 @@ public void testDeserializeSerializer() throws Exception { // ------------------------------------------------------------------------ public static void main(String[] args) throws Exception { - final AvroSerializer<String> serializer = new AvroSerializer<>(String.class); + final AvroSerializer<String> serializer = AvroSerializer.forNonGeneric(String.class); final File file = new File("flink-formats/flink-avro/src/test/resources/" + RESOURCE_NAME).getAbsoluteFile(); diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java index 0ab58683f5a..3d7c099f5c8 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java @@ -32,7 +32,7 @@ @Override protected TypeSerializer<User> createSerializer() { - return new AvroSerializer<>(User.class); + return AvroSerializer.forNonGeneric(User.class); } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services