Hi Arvid, Okay, I'd better get back to Avro as you suggested! Thanks for the tips regarding Avro.
Best, Dongwon On Wed, Jan 13, 2021 at 3:28 AM Arvid Heise <ar...@ververica.com> wrote: > Do you think Scala is a better option in that regard? > > I'm not sure if scala is better in this regard. Sure you could use sealed > classes but I don't know if the schema inference is using it. Maybe @Timo > Walther <twal...@apache.org> knows more? > > I used to define Avro records in .avro files and generate Java classes to >> use in my Flink application, but recently decided to move to POJOs for some >> reasons (e.g. custom Java annotations, etc). >> > If it's just about annotations: You can use custom Java annotations with > Avro generated classes by setting "javaAnnotations" property. [1] You can > use it on records and fields. > You can even provide your own velocity template [2] to add more features > to the generation. > > [1] > https://www.javacodegeeks.com/2020/07/it-is-never-enough-of-them-enriching-apache-avro-generated-classes-with-custom-java-annotations.html > [2] > https://github.com/apache/avro/blob/master/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/record.vm > > On Tue, Jan 12, 2021 at 11:16 AM Dongwon Kim <eastcirc...@gmail.com> > wrote: > >> Hi Arvid, >> >> Thanks for the very detailed explanation and tips! >> >> inferring the type information of Java classes is quite messy. At first, >>> it seems like that should work out the box as you are only using <A> as the >>> type of the list, right? However, there is no way of knowing if you didn't >>> use a subclass of A. Of course, if A was final, it might be possible to >>> exclude this case but you quickly go down a rabbit hole. >>> >> Do you think Scala is a better option in that regard? >> >> To avoid such limitations, I always recommend schema-first approaches >>> when moving a PoC to production code. First figure out what kind of data >>> you actually want to transfer. Then, settle for a serializer [2]. Then, >>> create the schema and let the classes be generated (on-the-fly). >>> I usually do it in two ways: if I write a rather generic program, I try >>> to use Table API, which optimizes everything in a Row has one of the most >>> memory efficient representations. >> >> I have to work with DataStream API as I need a custom trigger which is >> not supported in Table API AFAIK. >> >> If Table API is not sufficient, I fall back to Avro and use Avro specific >>> records [3]. >> >> I used to define Avro records in .avro files and generate Java classes to >> use in my Flink application, but recently decided to move to POJOs for some >> reasons (e.g. custom Java annotations, etc). >> >> So it seems like I'd better satisfy with BTypeFactory in my original >> question unless I'm willing to move to Scala or Avro, huh? >> >> Best, >> >> Dongwon >> >> On Fri, Jan 8, 2021 at 6:22 PM Arvid Heise <ar...@ververica.com> wrote: >> >>> Hi Dongwon, >>> >>> inferring the type information of Java classes is quite messy. At first, >>> it seems like that should work out the box as you are only using <A> as the >>> type of the list, right? However, there is no way of knowing if you didn't >>> use a subclass of A. Of course, if A was final, it might be possible to >>> exclude this case but you quickly go down a rabbit hole. >>> It gets especially bad if you consider that your classes evolve over >>> time. What happens if A is first final, but you later decide to subclass >>> it? How should Flink map old data to the new hierarchy? Flink falls back to >>> Kryo for most cases, which is why you need generic types. However, that is >>> rather inefficient unless you register all possible classes beforehand [1]. >>> >>> To avoid such limitations, I always recommend schema-first approaches >>> when moving a PoC to production code. First figure out what kind of data >>> you actually want to transfer. Then, settle for a serializer [2]. Then, >>> create the schema and let the classes be generated (on-the-fly). >>> >>> I usually do it in two ways: if I write a rather generic program, I try >>> to use Table API, which optimizes everything in a Row has one of the most >>> memory efficient representations. If Table API is not sufficient, I fall >>> back to Avro and use Avro specific records [3]. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html >>> [2] >>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html >>> [3] https://avro.apache.org/docs/current/gettingstartedjava.html >>> >>> >>> On Mon, Jan 4, 2021 at 6:49 PM Dongwon Kim <eastcirc...@gmail.com> >>> wrote: >>> >>>> Any advice would be appreciated :) >>>> >>>> Thanks, >>>> >>>> Dongwon >>>> >>>> ---------- Forwarded message --------- >>>> From: Dongwon Kim <eastcirc...@gmail.com> >>>> Date: Mon, Dec 14, 2020 at 11:27 PM >>>> Subject: How to gracefully avoid "Generic types have been disabled in >>>> the ExecutionConfig and type java.util.List is treated as a generic type"? >>>> To: user <user@flink.apache.org> >>>> >>>> >>>> Hi, >>>> >>>> The following program compiles and runs w/o exceptions: >>>> >>>>> public class Test { >>>>> >>>>> public static class A { >>>>> private int n; >>>>> >>>>> public A() { } >>>>> public int getN() { return n; } >>>>> public void setN(int n) { this.n = n; } >>>>> } >>>>> >>>>> public static class B { >>>>> private List<A> lst; >>>>> >>>>> public B() { } >>>>> public List<A> getLst() { return lst; } >>>>> public void setLst(List<A> lst) { this.lst = lst; } >>>>> } >>>>> >>>>> public static void main(String[] args) throws Exception { >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.createLocalEnvironment(); >>>>> >>>>> env.fromElements(new B()) >>>>> .print(); >>>>> >>>>> env.execute(); >>>>> } >>>>> } >>>>> >>>> >>>> When I add the following line, >>>> >>>>> env.getConfig().disableGenericTypes(); >>>> >>>> then the program shows me an exception: >>>> >>>>> Exception in thread "main" java.lang.UnsupportedOperationException: >>>>> Generic types have been disabled in the ExecutionConfig and type >>>>> java.util.List is treated as a generic type. >>>>> at >>>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) >>>>> at >>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:319) >>>>> at >>>>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:311) >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:970) >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromElements(StreamExecutionEnvironment.java:871) >>>>> at Test.main(Test.java:29) >>>> >>>> >>>> To avoid this exception, I found that I have to declare a type factory >>>> like: >>>> >>>>> public static class BTypeFactory extends TypeInfoFactory<B> { >>>>> @Override >>>>> public TypeInformation<B> createTypeInfo(Type t, Map<String, >>>>> TypeInformation<?>> genericParameters) { >>>>> return Types.POJO( >>>>> B.class, >>>>> ImmutableMap.<String, TypeInformation<?>>builder() >>>>> .put("lst", Types.LIST(Types.POJO(A.class))) >>>>> .build() >>>>> ); >>>>> } >>>>> } >>>> >>>> and give it to class B as follows: >>>> >>>>> @TypeInfo(BTypeFactory.class) >>>>> public static class B { >>>> >>>> >>>> Is there no other way but to declare BTypeFactory in such cases? >>>> I don't like the way I have to type a field name twice, one for a >>>> member variable and the other for an Map entry in TypeInfoFactory. >>>> >>>> Thanks in advance, >>>> >>>> Dongwon >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >