> > Do you think Scala is a better option in that regard? I'm not sure if scala is better in this regard. Sure you could use sealed classes but I don't know if the schema inference is using it. Maybe @Timo Walther <twal...@apache.org> knows more?
I used to define Avro records in .avro files and generate Java classes to > use in my Flink application, but recently decided to move to POJOs for some > reasons (e.g. custom Java annotations, etc). > If it's just about annotations: You can use custom Java annotations with Avro generated classes by setting "javaAnnotations" property. [1] You can use it on records and fields. You can even provide your own velocity template [2] to add more features to the generation. [1] https://www.javacodegeeks.com/2020/07/it-is-never-enough-of-them-enriching-apache-avro-generated-classes-with-custom-java-annotations.html [2] https://github.com/apache/avro/blob/master/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Arvid, > > Thanks for the very detailed explanation and tips! > > inferring the type information of Java classes is quite messy. At first, >> it seems like that should work out the box as you are only using <A> as the >> type of the list, right? However, there is no way of knowing if you didn't >> use a subclass of A. Of course, if A was final, it might be possible to >> exclude this case but you quickly go down a rabbit hole. >> > Do you think Scala is a better option in that regard? > > To avoid such limitations, I always recommend schema-first approaches when >> moving a PoC to production code. First figure out what kind of data you >> actually want to transfer. Then, settle for a serializer [2]. Then, create >> the schema and let the classes be generated (on-the-fly). >> I usually do it in two ways: if I write a rather generic program, I try >> to use Table API, which optimizes everything in a Row has one of the most >> memory efficient representations. > > I have to work with DataStream API as I need a custom trigger which is not > supported in Table API AFAIK. > > If Table API is not sufficient, I fall back to Avro and use Avro specific >> records [3]. > > I used to define Avro records in .avro files and generate Java classes to > use in my Flink application, but recently decided to move to POJOs for some > reasons (e.g. custom Java annotations, etc). > > So it seems like I'd better satisfy with BTypeFactory in my original > question unless I'm willing to move to Scala or Avro, huh? > > Best, > > Dongwon > > On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Dongwon, >> >> inferring the type information of Java classes is quite messy. At first, >> it seems like that should work out the box as you are only using <A> as the >> type of the list, right? However, there is no way of knowing if you didn't >> use a subclass of A. Of course, if A was final, it might be possible to >> exclude this case but you quickly go down a rabbit hole. >> It gets especially bad if you consider that your classes evolve over >> time. What happens if A is first final, but you later decide to subclass >> it? How should Flink map old data to the new hierarchy? Flink falls back to >> Kryo for most cases, which is why you need generic types. However, that is >> rather inefficient unless you register all possible classes beforehand [1]. >> >> To avoid such limitations, I always recommend schema-first approaches >> when moving a PoC to production code. First figure out what kind of data >> you actually want to transfer. Then, settle for a serializer [2]. Then, >> create the schema and let the classes be generated (on-the-fly). >> >> I usually do it in two ways: if I write a rather generic program, I try >> to use Table API, which optimizes everything in a Row has one of the most >> memory efficient representations. If Table API is not sufficient, I fall >> back to Avro and use Avro specific records [3]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html >> [2] >> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html >> [3] https://avro.apache.org/docs/current/gettingstartedjava.html >> >> >> On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <eastcirc...@gmail.com> wrote: >> >>> Any advice would be appreciated :) >>> >>> Thanks, >>> >>> Dongwon >>> >>> ---------- Forwarded message --------- >>> From: Dongwon Kim <eastcirc...@gmail.com> >>> Date: Mon, Dec 14, 2020 at 11:27 PM >>> Subject: How to gracefully avoid "Generic types have been disabled in >>> the ExecutionConfig and type java.util.List is treated as a generic type"? >>> To: user <user@flink.apache.org> >>> >>> >>> Hi, >>> >>> The following program compiles and runs w/o exceptions: >>> >>>> public class Test { >>>> >>>> public static class A { >>>> private int n; >>>> >>>> public A() { } >>>> public int getN() { return n; } >>>> public void setN(int n) { this.n = n; } >>>> } >>>> >>>> public static class B { >>>> private List<A> lst; >>>> >>>> public B() { } >>>> public List<A> getLst() { return lst; } >>>> public void setLst(List<A> lst) { this.lst = lst; } >>>> } >>>> >>>> public static void main(String[] args) throws Exception { >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.createLocalEnvironment(); >>>> >>>> env.fromElements(new B()) >>>> .print(); >>>> >>>> env.execute(); >>>> } >>>> } >>>> >>> >>> When I add the following line, >>> >>>> env.getConfig().disableGenericTypes(); >>> >>> then the program shows me an exception: >>> >>>> Exception in thread "main" java.lang.UnsupportedOperationException: >>>> Generic types have been disabled in the ExecutionConfig and type >>>> java.util.List is treated as a generic type. >>>> at >>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) >>>> at >>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319) >>>> at >>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871) >>>> at Test.main(Test.java:29) >>> >>> >>> To avoid this exception, I found that I have to declare a type factory >>> like: >>> >>>> public static class BTypeFactory extends TypeInfoFactory<B> { >>>> @Override >>>> public TypeInformation<B> createTypeInfo(Type t, Map<String, >>>> TypeInformation<?>> genericParameters) { >>>> return Types.POJO( >>>> B.class, >>>> ImmutableMap.<String, TypeInformation<?>>builder() >>>> .put("lst", Types.LIST(Types.POJO(A.class))) >>>> .build() >>>> ); >>>> } >>>> } >>> >>> and give it to class B as follows: >>> >>>> @TypeInfo(BTypeFactory.class) >>>> public static class B { >>> >>> >>> Is there no other way but to declare BTypeFactory in such cases? >>> I don't like the way I have to type a field name twice, one for a member >>> variable and the other for an Map entry in TypeInfoFactory. >>> >>> Thanks in advance, >>> >>> Dongwon >>> >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng