Hi Dongwon,

inferring the type information of Java classes is quite messy. At first, it
seems like that should work out the box as you are only using <A> as the
type of the list, right? However, there is no way of knowing if you didn't
use a subclass of A. Of course, if A was final, it might be possible to
exclude this case but you quickly go down a rabbit hole.
It gets especially bad if you consider that your classes evolve over time.
What happens if A is first final, but you later decide to subclass it? How
should Flink map old data to the new hierarchy? Flink falls back to Kryo
for most cases, which is why you need generic types. However, that is
rather inefficient unless you register all possible classes beforehand [1].

To avoid such limitations, I always recommend schema-first approaches when
moving a PoC to production code. First figure out what kind of data you
actually want to transfer. Then, settle for a serializer [2]. Then, create
the schema and let the classes be generated (on-the-fly).

I usually do it in two ways: if I write a rather generic program, I try to
use Table API, which optimizes everything in a Row has one of the most
memory efficient representations. If Table API is not sufficient, I fall
back to Avro and use Avro specific records [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
[2]
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
[3] https://avro.apache.org/docs/current/gettingstartedjava.html


On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Any advice would be appreciated :)
>
> Thanks,
>
> Dongwon
>
> ---------- Forwarded message ---------
> From: Dongwon Kim <eastcirc...@gmail.com>
> Date: Mon, Dec 14, 2020 at 11:27 PM
> Subject: How to gracefully avoid "Generic types have been disabled in the
> ExecutionConfig and type java.util.List is treated as a generic type"?
> To: user <user@flink.apache.org>
>
>
> Hi,
>
> The following program compiles and runs w/o exceptions:
>
>> public class Test {
>>
>>   public static class A {
>>     private int n;
>>
>>     public A() { }
>>     public int getN() {  return n;  }
>>     public void setN(int n) {  this.n = n;  }
>>   }
>>
>>   public static class B {
>>     private List<A> lst;
>>
>>     public B() { }
>>     public List<A> getLst() {  return lst;  }
>>     public void setLst(List<A> lst) {  this.lst = lst;  }
>>   }
>>
>>   public static void main(String[] args) throws Exception {
>>     StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment();
>>
>>     env.fromElements(new B())
>>       .print();
>>
>>     env.execute();
>>   }
>> }
>>
>
> When I add the following line,
>
>> env.getConfig().disableGenericTypes();
>
> then the program shows me an exception:
>
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Generic types have been disabled in the ExecutionConfig and type
>> java.util.List is treated as a generic type.
>> at
>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
>> at
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319)
>> at
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871)
>> at Test.main(Test.java:29)
>
>
> To avoid this exception, I found that I have to declare a type factory
> like:
>
>>   public static class BTypeFactory extends TypeInfoFactory<B> {
>>     @Override
>>     public TypeInformation<B> createTypeInfo(Type t, Map<String,
>> TypeInformation<?>> genericParameters) {
>>       return Types.POJO(
>>         B.class,
>>         ImmutableMap.<String, TypeInformation<?>>builder()
>>           .put("lst", Types.LIST(Types.POJO(A.class)))
>>         .build()
>>       );
>>     }
>>   }
>
> and give it to class B as follows:
>
>>   @TypeInfo(BTypeFactory.class)
>>   public static class B {
>
>
> Is there no other way but to declare BTypeFactory in such cases?
> I don't like the way I have to type a field name twice, one for a member
> variable and the other for an Map entry in TypeInfoFactory.
>
> Thanks in advance,
>
> Dongwon
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to