[ 
https://issues.apache.org/jira/browse/FLINK-9569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16669656#comment-16669656
 ] 

ASF GitHub Bot commented on FLINK-9569:
---------------------------------------

tzulitai closed pull request #6151: [FLINK-9569] [avro] Fix confusing 
construction of GenericRecord AvroSerializers
URL: https://github.com/apache/flink/pull/6151
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index b313625bfe2..09bd5dd8ef7 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
@@ -46,6 +47,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -67,6 +69,7 @@
  *
  * @param <T> The type to be serialized.
  */
+@Internal
 public class AvroSerializer<T> extends TypeSerializer<T> {
 
        private static final long serialVersionUID = 1L;
@@ -105,41 +108,54 @@
        /** The currently accessing thread, set and checked on debug level 
only. */
        private transient volatile Thread currentThread;
 
-       // 
------------------------------------------------------------------------
+       // ----------------------- instantiation methods 
--------------------------
 
        /**
         * Creates a new AvroSerializer for the type indicated by the given 
class.
-        * This constructor is intended to be used with {@link SpecificRecord} 
or reflection serializer.
-        * For serializing {@link GenericData.Record} use {@link 
AvroSerializer#AvroSerializer(Class, Schema)}
+        *
+        * <p>This constructor is expected to be used only with {@link 
GenericRecord}.
+        * For {@link SpecificRecord} or reflection serializer use {@link 
AvroSerializer#forNonGeneric(Class)}.
+        *
+        * @param schema the explicit schema to use for generic records.
         */
-       public AvroSerializer(Class<T> type) {
-               checkArgument(!isGenericRecord(type),
-                       "For GenericData.Record use constructor with explicit 
schema.");
-               this.type = checkNotNull(type);
-               this.schemaString = null;
+       public static AvroSerializer<GenericRecord> forGeneric(Schema schema) {
+               return new AvroSerializer<>(GenericRecord.class, schema);
        }
 
        /**
         * Creates a new AvroSerializer for the type indicated by the given 
class.
-        * This constructor is expected to be used only with {@link 
GenericData.Record}.
-        * For {@link SpecificRecord} or reflection serializer use
-        * {@link AvroSerializer#AvroSerializer(Class)}
+        *
+        * <p>This instantiation method is intended to be used with {@link 
SpecificRecord} or reflection serializer.
+        * For serializing {@link GenericData.Record} use {@link 
AvroSerializer#forGeneric(Schema)}.
+        *
+        * @param type the type to be serialized.
         */
-       public AvroSerializer(Class<T> type, Schema schema) {
-               checkArgument(isGenericRecord(type),
-                       "For classes other than GenericData.Record use 
constructor without explicit schema.");
-               this.type = checkNotNull(type);
-               this.schema = checkNotNull(schema);
-               this.schemaString = schema.toString();
+       public static <T> AvroSerializer<T> forNonGeneric(Class<T> type) {
+               checkArgument(!isGenericRecord(type),
+                       "For generic records, use 
AvroSerializer.forGeneric(schema) to provide an explicit schema.");
+
+               return new AvroSerializer<>(type, null);
        }
 
+
+       // 
------------------------------------------------------------------------
+
        /**
-        * @deprecated Use {@link AvroSerializer#AvroSerializer(Class)} instead.
+        * Private constructor.
+        *
+        * @param type the type to be serialized.
+        * @param schema the explicit schema to use. This is should be non-null 
only when
+        *               the type to be serialized are {@link GenericRecord}s.
         */
-       @Deprecated
-       @SuppressWarnings("unused")
-       public AvroSerializer(Class<T> type, Class<? extends T> 
typeToInstantiate) {
-               this(type);
+       private AvroSerializer(Class<T> type, @Nullable Schema schema) {
+               this.type = checkNotNull(type);
+               this.schema = schema;
+
+               if (schema == null) {
+                       this.schemaString = null;
+               } else {
+                       this.schemaString = schema.toString();
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -311,12 +327,7 @@ private static boolean isGenericRecord(Class<?> type) {
 
        @Override
        public TypeSerializer<T> duplicate() {
-               if (schemaString != null) {
-                       return new AvroSerializer<>(type, schema);
-               } else {
-                       return new AvroSerializer<>(type);
-
-               }
+               return new AvroSerializer<>(type, schema);
        }
 
        @Override
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 644ee50d361..ca5cf308a2d 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -82,7 +82,7 @@ public AvroTypeInfo(Class<T> typeClass, boolean 
useBackwardsCompatibleSerializer
        public TypeSerializer<T> createSerializer(ExecutionConfig config) {
                return useBackwardsCompatibleSerializer ?
                                new 
BackwardsCompatibleAvroSerializer<>(getTypeClass()) :
-                               new AvroSerializer<>(getTypeClass());
+                               AvroSerializer.forNonGeneric(getTypeClass());
        }
 
        @SuppressWarnings("unchecked")
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
index e5eb5d89a1f..4080e280453 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
@@ -61,7 +61,7 @@
         */
        public BackwardsCompatibleAvroSerializer(Class<T> type) {
                this.type = type;
-               this.serializer = new AvroSerializer<>(type);
+               this.serializer = AvroSerializer.forNonGeneric(type);
        }
 
        /**
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
index 83d590acaf9..0df85529790 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
@@ -77,7 +77,7 @@ public boolean isKeyType() {
 
        @Override
        public TypeSerializer<GenericRecord> createSerializer(ExecutionConfig 
config) {
-               return new AvroSerializer<>(GenericRecord.class, schema);
+               return AvroSerializer.forGeneric(schema);
        }
 
        @Override
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index 5744abc1657..e02f9bad646 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -70,7 +70,7 @@ public void 
addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegist
 
        @Override
        public <T> TypeSerializer<T> createAvroSerializer(Class<T> type) {
-               return new AvroSerializer<>(type);
+               return AvroSerializer.forNonGeneric(type);
        }
 
        @Override
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
index 89be9c06d33..16c594fd736 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
@@ -28,6 +28,6 @@
 
        @Override
        protected <T> TypeSerializer<T> createComponentSerializer(Class<T> 
type) {
-               return new AvroSerializer<T>(type);
+               return AvroSerializer.forNonGeneric(type);
        }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
index a247766ae15..40811ddd24f 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
@@ -28,6 +28,6 @@
 
        @Override
        protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new AvroSerializer<>(type);
+               return AvroSerializer.forNonGeneric(type);
        }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
index 1c1a19b324c..f8b5302f5da 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
@@ -28,6 +28,6 @@
 
        @Override
        protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-               return new AvroSerializer<>(type);
+               return AvroSerializer.forNonGeneric(type);
        }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
index aaa9b4b08b7..5dcf6a770f4 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
@@ -40,7 +40,7 @@
 
        @Test
        public void testConcurrentUseOfSerializer() throws Exception {
-               final AvroSerializer<String> serializer = new 
AvroSerializer<>(String.class);
+               final AvroSerializer<String> serializer = 
AvroSerializer.forNonGeneric(String.class);
 
                final BlockerSync sync = new BlockerSync();
 
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
index bb3d911b3d3..354b78e79cd 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
@@ -37,7 +37,7 @@
        public void testBookSerialization() {
                try {
                        Book b = new Book(123, "This is a test book", 26382648);
-                       AvroSerializer<Book> serializer = new 
AvroSerializer<Book>(Book.class);
+                       AvroSerializer<Book> serializer = 
AvroSerializer.forNonGeneric(Book.class);
                        SerializerTestInstance<Book> test = new 
SerializerTestInstance<Book>(serializer, Book.class, -1, b);
                        test.testAll();
                }
@@ -61,7 +61,7 @@ public void testSerialization() {
                        a.books = books;
                        a.bookType = BookAuthor.BookType.journal;
 
-                       AvroSerializer<BookAuthor> serializer = new 
AvroSerializer<BookAuthor>(BookAuthor.class);
+                       AvroSerializer<BookAuthor> serializer = 
AvroSerializer.forNonGeneric(BookAuthor.class);
 
                        SerializerTestInstance<BookAuthor> test = new 
SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
                        test.testAll();
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
index c15aa7c0141..65ca4103e31 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
@@ -40,7 +40,7 @@
 
        @Test
        public void testDeserializeSerializer() throws Exception {
-               final AvroSerializer<String> currentSerializer = new 
AvroSerializer<>(String.class);
+               final AvroSerializer<String> currentSerializer = 
AvroSerializer.forNonGeneric(String.class);
 
                try (ObjectInputStream in = new ObjectInputStream(
                                
getClass().getClassLoader().getResourceAsStream(RESOURCE_NAME))) {
@@ -57,7 +57,7 @@ public void testDeserializeSerializer() throws Exception {
        // 
------------------------------------------------------------------------
 
        public static void main(String[] args) throws Exception {
-               final AvroSerializer<String> serializer = new 
AvroSerializer<>(String.class);
+               final AvroSerializer<String> serializer = 
AvroSerializer.forNonGeneric(String.class);
 
                final File file = new 
File("flink-formats/flink-avro/src/test/resources/" + 
RESOURCE_NAME).getAbsoluteFile();
 
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
index 0ab58683f5a..3d7c099f5c8 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
@@ -32,7 +32,7 @@
 
        @Override
        protected TypeSerializer<User> createSerializer() {
-               return new AvroSerializer<>(User.class);
+               return AvroSerializer.forNonGeneric(User.class);
        }
 
        @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Confusing construction of AvroSerializers for generic records
> -------------------------------------------------------------
>
>                 Key: FLINK-9569
>                 URL: https://issues.apache.org/jira/browse/FLINK-9569
>             Project: Flink
>          Issue Type: Improvement
>          Components: Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>              Labels: pull-request-available
>
> The {{AvroSerializer}} currently has a {{AvroSerializer(Class<T> type, Schema 
> schema)}} public constructor when used for generic records.
> This is a bit confusing, because when using the \{{AvroSerializer}}, the type 
> to be serialized should always be a {{GenericData.Record}} type.
> We should either:
> - have a separate subclass of {{AvroSerializer}}, say 
> {{GenericRecordAvroSerializer}} that is a {{AvroSerializer<GenericRecord>}}, 
> or
> - follow a similar approach to the instantiation methods in the 
> {{AvroDeserialiationSchema}}. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to