>
> Do you think Scala is a better option in that regard?

I'm not sure if scala is better in this regard. Sure you could use sealed
classes but I don't know if the schema inference is using it. Maybe @Timo
Walther <twal...@apache.org> knows more?

I used to define Avro records in .avro files and generate Java classes to
> use in my Flink application, but recently decided to move to POJOs for some
> reasons (e.g. custom Java annotations, etc).
>
If it's just about annotations: You can use custom Java annotations with
Avro generated classes by setting "javaAnnotations" property. [1] You can
use it on records and fields.
You can even provide your own velocity template [2] to add more features to
the generation.

[1]
https://www.javacodegeeks.com/2020/07/it-is-never-enough-of-them-enriching-apache-avro-generated-classes-with-custom-java-annotations.html
[2]
https://github.com/apache/avro/blob/master/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm

On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Arvid,
>
> Thanks for the very detailed explanation and tips!
>
> inferring the type information of Java classes is quite messy. At first,
>> it seems like that should work out the box as you are only using <A> as the
>> type of the list, right? However, there is no way of knowing if you didn't
>> use a subclass of A. Of course, if A was final, it might be possible to
>> exclude this case but you quickly go down a rabbit hole.
>>
> Do you think Scala is a better option in that regard?
>
> To avoid such limitations, I always recommend schema-first approaches when
>> moving a PoC to production code. First figure out what kind of data you
>> actually want to transfer. Then, settle for a serializer [2]. Then, create
>> the schema and let the classes be generated (on-the-fly).
>> I usually do it in two ways: if I write a rather generic program, I try
>> to use Table API, which optimizes everything in a Row has one of the most
>> memory efficient representations.
>
> I have to work with DataStream API as I need a custom trigger which is not
> supported in Table API AFAIK.
>
> If Table API is not sufficient, I fall back to Avro and use Avro specific
>> records [3].
>
> I used to define Avro records in .avro files and generate Java classes to
> use in my Flink application, but recently decided to move to POJOs for some
> reasons (e.g. custom Java annotations, etc).
>
> So it seems like I'd better satisfy with BTypeFactory in my original
> question unless I'm willing to move to Scala or Avro, huh?
>
> Best,
>
> Dongwon
>
> On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Dongwon,
>>
>> inferring the type information of Java classes is quite messy. At first,
>> it seems like that should work out the box as you are only using <A> as the
>> type of the list, right? However, there is no way of knowing if you didn't
>> use a subclass of A. Of course, if A was final, it might be possible to
>> exclude this case but you quickly go down a rabbit hole.
>> It gets especially bad if you consider that your classes evolve over
>> time. What happens if A is first final, but you later decide to subclass
>> it? How should Flink map old data to the new hierarchy? Flink falls back to
>> Kryo for most cases, which is why you need generic types. However, that is
>> rather inefficient unless you register all possible classes beforehand [1].
>>
>> To avoid such limitations, I always recommend schema-first approaches
>> when moving a PoC to production code. First figure out what kind of data
>> you actually want to transfer. Then, settle for a serializer [2]. Then,
>> create the schema and let the classes be generated (on-the-fly).
>>
>> I usually do it in two ways: if I write a rather generic program, I try
>> to use Table API, which optimizes everything in a Row has one of the most
>> memory efficient representations. If Table API is not sufficient, I fall
>> back to Avro and use Avro specific records [3].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
>> [2]
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>> [3] https://avro.apache.org/docs/current/gettingstartedjava.html
>>
>>
>> On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
>>
>>> Any advice would be appreciated :)
>>>
>>> Thanks,
>>>
>>> Dongwon
>>>
>>> ---------- Forwarded message ---------
>>> From: Dongwon Kim <eastcirc...@gmail.com>
>>> Date: Mon, Dec 14, 2020 at 11:27 PM
>>> Subject: How to gracefully avoid "Generic types have been disabled in
>>> the ExecutionConfig and type java.util.List is treated as a generic type"?
>>> To: user <user@flink.apache.org>
>>>
>>>
>>> Hi,
>>>
>>> The following program compiles and runs w/o exceptions:
>>>
>>>> public class Test {
>>>>
>>>>   public static class A {
>>>>     private int n;
>>>>
>>>>     public A() { }
>>>>     public int getN() {  return n;  }
>>>>     public void setN(int n) {  this.n = n;  }
>>>>   }
>>>>
>>>>   public static class B {
>>>>     private List<A> lst;
>>>>
>>>>     public B() { }
>>>>     public List<A> getLst() {  return lst;  }
>>>>     public void setLst(List<A> lst) {  this.lst = lst;  }
>>>>   }
>>>>
>>>>   public static void main(String[] args) throws Exception {
>>>>     StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>>
>>>>     env.fromElements(new B())
>>>>       .print();
>>>>
>>>>     env.execute();
>>>>   }
>>>> }
>>>>
>>>
>>> When I add the following line,
>>>
>>>> env.getConfig().disableGenericTypes();
>>>
>>> then the program shows me an exception:
>>>
>>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>>> Generic types have been disabled in the ExecutionConfig and type
>>>> java.util.List is treated as a generic type.
>>>> at
>>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>>>> at
>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
>>>> at
>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
>>>> at Test.main(Test.java:29)
>>>
>>>
>>> To avoid this exception, I found that I have to declare a type factory
>>> like:
>>>
>>>>   public static class BTypeFactory extends TypeInfoFactory<B> {
>>>>     @Override
>>>>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
>>>> TypeInformation<?>> genericParameters) {
>>>>       return Types.POJO(
>>>>         B.class,
>>>>         ImmutableMap.<String, TypeInformation<?>>builder()
>>>>           .put("lst", Types.LIST(Types.POJO(A.class)))
>>>>         .build()
>>>>       );
>>>>     }
>>>>   }
>>>
>>> and give it to class B as follows:
>>>
>>>>   @TypeInfo(BTypeFactory.class)
>>>>   public static class B {
>>>
>>>
>>> Is there no other way but to declare BTypeFactory in such cases?
>>> I don't like the way I have to type a field name twice, one for a member
>>> variable and the other for an Map entry in TypeInfoFactory.
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to