Hi Dongwon, inferring the type information of Java classes is quite messy. At first, it seems like that should work out the box as you are only using <A> as the type of the list, right? However, there is no way of knowing if you didn't use a subclass of A. Of course, if A was final, it might be possible to exclude this case but you quickly go down a rabbit hole. It gets especially bad if you consider that your classes evolve over time. What happens if A is first final, but you later decide to subclass it? How should Flink map old data to the new hierarchy? Flink falls back to Kryo for most cases, which is why you need generic types. However, that is rather inefficient unless you register all possible classes beforehand [1].
To avoid such limitations, I always recommend schema-first approaches when moving a PoC to production code. First figure out what kind of data you actually want to transfer. Then, settle for a serializer [2]. Then, create the schema and let the classes be generated (on-the-fly). I usually do it in two ways: if I write a rather generic program, I try to use Table API, which optimizes everything in a Row has one of the most memory efficient representations. If Table API is not sufficient, I fall back to Avro and use Avro specific records [3]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html [2] https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html [3] https://avro.apache.org/docs/current/gettingstartedjava.html On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > Any advice would be appreciated :) > > Thanks, > > Dongwon > > ---------- Forwarded message --------- > From: Dongwon Kim <eastcirc...@gmail.com> > Date: Mon, Dec 14, 2020 at 11:27 PM > Subject: How to gracefully avoid "Generic types have been disabled in the > ExecutionConfig and type java.util.List is treated as a generic type"? > To: user <user@flink.apache.org> > > > Hi, > > The following program compiles and runs w/o exceptions: > >> public class Test { >> >> public static class A { >> private int n; >> >> public A() { } >> public int getN() { return n; } >> public void setN(int n) { this.n = n; } >> } >> >> public static class B { >> private List<A> lst; >> >> public B() { } >> public List<A> getLst() { return lst; } >> public void setLst(List<A> lst) { this.lst = lst; } >> } >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.createLocalEnvironment(); >> >> env.fromElements(new B()) >> .print(); >> >> env.execute(); >> } >> } >> > > When I add the following line, > >> env.getConfig().disableGenericTypes(); > > then the program shows me an exception: > >> Exception in thread "main" java.lang.UnsupportedOperationException: >> Generic types have been disabled in the ExecutionConfig and type >> java.util.List is treated as a generic type. >> at >> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) >> at >> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319) >> at >> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871) >> at Test.main(Test.java:29) > > > To avoid this exception, I found that I have to declare a type factory > like: > >> public static class BTypeFactory extends TypeInfoFactory<B> { >> @Override >> public TypeInformation<B> createTypeInfo(Type t, Map<String, >> TypeInformation<?>> genericParameters) { >> return Types.POJO( >> B.class, >> ImmutableMap.<String, TypeInformation<?>>builder() >> .put("lst", Types.LIST(Types.POJO(A.class))) >> .build() >> ); >> } >> } > > and give it to class B as follows: > >> @TypeInfo(BTypeFactory.class) >> public static class B { > > > Is there no other way but to declare BTypeFactory in such cases? > I don't like the way I have to type a field name twice, one for a member > variable and the other for an Map entry in TypeInfoFactory. > > Thanks in advance, > > Dongwon > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng