Hi Arvid, Thanks for the very detailed explanation and tips!
inferring the type information of Java classes is quite messy. At first, it > seems like that should work out the box as you are only using <A> as the > type of the list, right? However, there is no way of knowing if you didn't > use a subclass of A. Of course, if A was final, it might be possible to > exclude this case but you quickly go down a rabbit hole. > Do you think Scala is a better option in that regard? To avoid such limitations, I always recommend schema-first approaches when > moving a PoC to production code. First figure out what kind of data you > actually want to transfer. Then, settle for a serializer [2]. Then, create > the schema and let the classes be generated (on-the-fly). > I usually do it in two ways: if I write a rather generic program, I try to > use Table API, which optimizes everything in a Row has one of the most > memory efficient representations. I have to work with DataStream API as I need a custom trigger which is not supported in Table API AFAIK. If Table API is not sufficient, I fall back to Avro and use Avro specific > records [3]. I used to define Avro records in .avro files and generate Java classes to use in my Flink application, but recently decided to move to POJOs for some reasons (e.g. custom Java annotations, etc). So it seems like I'd better satisfy with BTypeFactory in my original question unless I'm willing to move to Scala or Avro, huh? Best, Dongwon On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <ar...@ververica.com> wrote: > Hi Dongwon, > > inferring the type information of Java classes is quite messy. At first, > it seems like that should work out the box as you are only using <A> as the > type of the list, right? However, there is no way of knowing if you didn't > use a subclass of A. Of course, if A was final, it might be possible to > exclude this case but you quickly go down a rabbit hole. > It gets especially bad if you consider that your classes evolve over time. > What happens if A is first final, but you later decide to subclass it? How > should Flink map old data to the new hierarchy? Flink falls back to Kryo > for most cases, which is why you need generic types. However, that is > rather inefficient unless you register all possible classes beforehand [1]. > > To avoid such limitations, I always recommend schema-first approaches when > moving a PoC to production code. First figure out what kind of data you > actually want to transfer. Then, settle for a serializer [2]. Then, create > the schema and let the classes be generated (on-the-fly). > > I usually do it in two ways: if I write a rather generic program, I try to > use Table API, which optimizes everything in a Row has one of the most > memory efficient representations. If Table API is not sufficient, I fall > back to Avro and use Avro specific records [3]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html > [2] > https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html > [3] https://avro.apache.org/docs/current/gettingstartedjava.html > > > On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Any advice would be appreciated :) >> >> Thanks, >> >> Dongwon >> >> ---------- Forwarded message --------- >> From: Dongwon Kim <eastcirc...@gmail.com> >> Date: Mon, Dec 14, 2020 at 11:27 PM >> Subject: How to gracefully avoid "Generic types have been disabled in the >> ExecutionConfig and type java.util.List is treated as a generic type"? >> To: user <user@flink.apache.org> >> >> >> Hi, >> >> The following program compiles and runs w/o exceptions: >> >>> public class Test { >>> >>> public static class A { >>> private int n; >>> >>> public A() { } >>> public int getN() { return n; } >>> public void setN(int n) { this.n = n; } >>> } >>> >>> public static class B { >>> private List<A> lst; >>> >>> public B() { } >>> public List<A> getLst() { return lst; } >>> public void setLst(List<A> lst) { this.lst = lst; } >>> } >>> >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.createLocalEnvironment(); >>> >>> env.fromElements(new B()) >>> .print(); >>> >>> env.execute(); >>> } >>> } >>> >> >> When I add the following line, >> >>> env.getConfig().disableGenericTypes(); >> >> then the program shows me an exception: >> >>> Exception in thread "main" java.lang.UnsupportedOperationException: >>> Generic types have been disabled in the ExecutionConfig and type >>> java.util.List is treated as a generic type. >>> at >>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) >>> at >>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319) >>> at >>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871) >>> at Test.main(Test.java:29) >> >> >> To avoid this exception, I found that I have to declare a type factory >> like: >> >>> public static class BTypeFactory extends TypeInfoFactory<B> { >>> @Override >>> public TypeInformation<B> createTypeInfo(Type t, Map<String, >>> TypeInformation<?>> genericParameters) { >>> return Types.POJO( >>> B.class, >>> ImmutableMap.<String, TypeInformation<?>>builder() >>> .put("lst", Types.LIST(Types.POJO(A.class))) >>> .build() >>> ); >>> } >>> } >> >> and give it to class B as follows: >> >>> @TypeInfo(BTypeFactory.class) >>> public static class B { >> >> >> Is there no other way but to declare BTypeFactory in such cases? >> I don't like the way I have to type a field name twice, one for a member >> variable and the other for an Map entry in TypeInfoFactory. >> >> Thanks in advance, >> >> Dongwon >> > > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >