Hi Arvid,

Okay, I'd better get back to Avro as you suggested!
Thanks for the tips regarding Avro.

Best,

Dongwon

On Wed, Jan 13, 2021 at 3:28 AM Arvid Heise <ar...@ververica.com> wrote:

> Do you think Scala is a better option in that regard?
>
> I'm not sure if scala is better in this regard. Sure you could use sealed
> classes but I don't know if the schema inference is using it. Maybe @Timo
> Walther <twal...@apache.org> knows more?
>
> I used to define Avro records in .avro files and generate Java classes to
>> use in my Flink application, but recently decided to move to POJOs for some
>> reasons (e.g. custom Java annotations, etc).
>>
> If it's just about annotations: You can use custom Java annotations with
> Avro generated classes by setting "javaAnnotations" property. [1] You can
> use it on records and fields.
> You can even provide your own velocity template [2] to add more features
> to the generation.
>
> [1]
> https://www.javacodegeeks.com/2020/07/it-is-never-enough-of-them-enriching-apache-avro-generated-classes-with-custom-java-annotations.html
> [2]
> https://github.com/apache/avro/blob/master/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm
>
> On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the very detailed explanation and tips!
>>
>> inferring the type information of Java classes is quite messy. At first,
>>> it seems like that should work out the box as you are only using <A> as the
>>> type of the list, right? However, there is no way of knowing if you didn't
>>> use a subclass of A. Of course, if A was final, it might be possible to
>>> exclude this case but you quickly go down a rabbit hole.
>>>
>> Do you think Scala is a better option in that regard?
>>
>> To avoid such limitations, I always recommend schema-first approaches
>>> when moving a PoC to production code. First figure out what kind of data
>>> you actually want to transfer. Then, settle for a serializer [2]. Then,
>>> create the schema and let the classes be generated (on-the-fly).
>>> I usually do it in two ways: if I write a rather generic program, I try
>>> to use Table API, which optimizes everything in a Row has one of the most
>>> memory efficient representations.
>>
>> I have to work with DataStream API as I need a custom trigger which is
>> not supported in Table API AFAIK.
>>
>> If Table API is not sufficient, I fall back to Avro and use Avro specific
>>> records [3].
>>
>> I used to define Avro records in .avro files and generate Java classes to
>> use in my Flink application, but recently decided to move to POJOs for some
>> reasons (e.g. custom Java annotations, etc).
>>
>> So it seems like I'd better satisfy with BTypeFactory in my original
>> question unless I'm willing to move to Scala or Avro, huh?
>>
>> Best,
>>
>> Dongwon
>>
>> On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> inferring the type information of Java classes is quite messy. At first,
>>> it seems like that should work out the box as you are only using <A> as the
>>> type of the list, right? However, there is no way of knowing if you didn't
>>> use a subclass of A. Of course, if A was final, it might be possible to
>>> exclude this case but you quickly go down a rabbit hole.
>>> It gets especially bad if you consider that your classes evolve over
>>> time. What happens if A is first final, but you later decide to subclass
>>> it? How should Flink map old data to the new hierarchy? Flink falls back to
>>> Kryo for most cases, which is why you need generic types. However, that is
>>> rather inefficient unless you register all possible classes beforehand [1].
>>>
>>> To avoid such limitations, I always recommend schema-first approaches
>>> when moving a PoC to production code. First figure out what kind of data
>>> you actually want to transfer. Then, settle for a serializer [2]. Then,
>>> create the schema and let the classes be generated (on-the-fly).
>>>
>>> I usually do it in two ways: if I write a rather generic program, I try
>>> to use Table API, which optimizes everything in a Row has one of the most
>>> memory efficient representations. If Table API is not sufficient, I fall
>>> back to Avro and use Avro specific records [3].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
>>> [2]
>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>>> [3] https://avro.apache.org/docs/current/gettingstartedjava.html
>>>
>>>
>>> On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <eastcirc...@gmail.com>
>>> wrote:
>>>
>>>> Any advice would be appreciated :)
>>>>
>>>> Thanks,
>>>>
>>>> Dongwon
>>>>
>>>> ---------- Forwarded message ---------
>>>> From: Dongwon Kim <eastcirc...@gmail.com>
>>>> Date: Mon, Dec 14, 2020 at 11:27 PM
>>>> Subject: How to gracefully avoid "Generic types have been disabled in
>>>> the ExecutionConfig and type java.util.List is treated as a generic type"?
>>>> To: user <user@flink.apache.org>
>>>>
>>>>
>>>> Hi,
>>>>
>>>> The following program compiles and runs w/o exceptions:
>>>>
>>>>> public class Test {
>>>>>
>>>>>   public static class A {
>>>>>     private int n;
>>>>>
>>>>>     public A() { }
>>>>>     public int getN() {  return n;  }
>>>>>     public void setN(int n) {  this.n = n;  }
>>>>>   }
>>>>>
>>>>>   public static class B {
>>>>>     private List<A> lst;
>>>>>
>>>>>     public B() { }
>>>>>     public List<A> getLst() {  return lst;  }
>>>>>     public void setLst(List<A> lst) {  this.lst = lst;  }
>>>>>   }
>>>>>
>>>>>   public static void main(String[] args) throws Exception {
>>>>>     StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>>>
>>>>>     env.fromElements(new B())
>>>>>       .print();
>>>>>
>>>>>     env.execute();
>>>>>   }
>>>>> }
>>>>>
>>>>
>>>> When I add the following line,
>>>>
>>>>> env.getConfig().disableGenericTypes();
>>>>
>>>> then the program shows me an exception:
>>>>
>>>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>>>> Generic types have been disabled in the ExecutionConfig and type
>>>>> java.util.List is treated as a generic type.
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
>>>>> at Test.main(Test.java:29)
>>>>
>>>>
>>>> To avoid this exception, I found that I have to declare a type factory
>>>> like:
>>>>
>>>>>   public static class BTypeFactory extends TypeInfoFactory<B> {
>>>>>     @Override
>>>>>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
>>>>> TypeInformation<?>> genericParameters) {
>>>>>       return Types.POJO(
>>>>>         B.class,
>>>>>         ImmutableMap.<String, TypeInformation<?>>builder()
>>>>>           .put("lst", Types.LIST(Types.POJO(A.class)))
>>>>>         .build()
>>>>>       );
>>>>>     }
>>>>>   }
>>>>
>>>> and give it to class B as follows:
>>>>
>>>>>   @TypeInfo(BTypeFactory.class)
>>>>>   public static class B {
>>>>
>>>>
>>>> Is there no other way but to declare BTypeFactory in such cases?
>>>> I don't like the way I have to type a field name twice, one for a
>>>> member variable and the other for an Map entry in TypeInfoFactory.
>>>>
>>>> Thanks in advance,
>>>>
>>>> Dongwon
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to