Hello Alexis, Thank you for sharing the helper classes this but unfortunately I have no idea how to use these classes or how they might be able to help me. This is all very new to me and I honestly can't wrap my head around Flink's type information system.
Best regards, Saleh. > On 14 Aug 2023, at 4:05 PM, Alexis Sarda-Espinosa <sarda.espin...@gmail.com> > wrote: > > Hello, > > AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing > Flink can do about that. Here's an example of helper classes I've been using > to support set serde in Flink POJOs, but note that it's hardcoded for > LinkedHashSet, so you would have to create different implementations if you > need to differentiate sorted sets: > > https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398 > <https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398> > > Regards, > Alexis. > > > Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb <s...@sammar.sa > <mailto:s...@sammar.sa>>: > Hi, > > Here's a minimal example using an ArrayList, a HashSet, and a TreeSet: > ``` > package com.example; > import java.util.ArrayList; > import java.util.HashSet; > import java.util.TreeSet; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > public class App { > public static class Pojo { > public ArrayList<Integer> list; > public HashSet<Integer> set; > public TreeSet<Integer> treeset; > public Pojo() { > this.list = new ArrayList<>(); > this.set = new HashSet<>(); > this.treeset = new TreeSet<>(); > } > } > public static void main(String[] args) throws Exception { > var env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().disableGenericTypes(); > env.fromElements(new Pojo()).print(); > env.execute("Pipeline"); > } > } > ``` > > The result of running: > ``` > 13:08:20,074 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - class java.util.ArrayList does not contain a setter for field size > 13:08:20,077 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Class class java.util.ArrayList cannot be used as a POJO type because > not all fields are valid POJO fields, and must be processed as GenericType. > Please read the Flink documentation on "Data Types & Serialization" for > details of the effect on performance and schema evolution. > 13:08:20,078 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Field Pojo#list will be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance and schema evolution. > 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - No fields were detected for class java.util.HashSet so it cannot be > used as a POJO type and must be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance and schema evolution. > 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Field Pojo#set will be processed as GenericType. Please read the Flink > documentation on "Data Types & Serialization" for details of the effect on > performance and schema evolution. > 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - No fields were detected for class java.util.TreeSet so it cannot be > used as a POJO type and must be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance and schema evolution. > 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Field Pojo#sset will be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance and schema evolution. > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by > org.apache.flink.api.java.ClosureCleaner > (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar) > to field java.lang.String.value > WARNING: Please consider reporting this to the maintainers of > org.apache.flink.api.java.ClosureCleaner > WARNING: Use --illegal-access=warn to enable warnings of further illegal > reflective access operations > WARNING: All illegal access operations will be denied in a future release > Exception in thread "main" java.lang.UnsupportedOperationException: Generic > types have been disabled in the ExecutionConfig and type java.util.ArrayList > is treated as a generic type. > at > org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) > at > org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350) > at > org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342) > at > org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391) > at > org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53) > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40) > at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052) > at com.example.App.main(App.java:26) > ``` > > Best regards, > Saleh. > > >> On 14 Aug 2023, at 12:48 PM, Alexey Novakov via user <user@flink.apache.org >> <mailto:user@flink.apache.org>> wrote: >> >> Hi Saleh, >> >> If you could show us the minimal code example of the issue (event classes), >> I think someone could help you to solve it. >> >> Best regards, >> Alexey >> >> On Mon, Aug 14, 2023 at 9:23 AM <s...@sammar.sa <mailto:s...@sammar.sa>> >> wrote: >> Hi, >> >> According to this blog post >> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer >> >> <https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer> >> The "Must be processed as GenericType" message means that the POJO >> serializer will not be used and instead, Kyro will be used. >> >> I created a simple POJO to test it again with a java Collection but I got >> the same message. Disabling generic types throws an exception. >> >> I'm not sure how to use these types along with the POJO serializer or any >> other fast serializer. >> >> Best regards, >> Saleh. >> >> >> >>> On 14 Aug 2023, at 4:59 AM, liu ron <ron9....@gmail.com >>> <mailto:ron9....@gmail.com>> wrote: >>> >>> Hi, >>> >>> According to the test in [1], I think Flink can recognize Pojo class which >>> contains java List, so I think you can refer to the related Pojo class >>> implementation. >>> >>> [1] >>> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192 >>> >>> <https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192> >>> >>> Best, >>> Ron >>> >>> <s...@sammar.sa <mailto:s...@sammar.sa>> 于2023年8月13日周日 22:50写道: >>> Greetings, >>> >>> I am working on a project that needs to process around 100k events per >>> second and I'm trying to improve performance. >>> >>> Most of the classes being used are POJOs but have a couple of fields using >>> a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This >>> forces Flink to use Kyro and throw these warnings: >>> >>> ``` >>> class java.util.ArrayList does not contain a setter for field size >>> Class class java.util.ArrayList cannot be used as a POJO type because not >>> all fields are valid POJO fields, and must be processed as GenericType. >>> Please read the Flink documentation on "Data Types & Serialization" for >>> details of the effect on performance and schema evolution. >>> ``` >>> >>> ``` >>> No fields were detected for class java.util.HashSet so it cannot be used as >>> a POJO type and must be processed as GenericType. Please read the Flink >>> documentation on "Data Types & Serialization" for details of the effect on >>> performance and schema evolution. >>> I read through the documentation and stackoverflow and the conclusion is >>> that I need to make a TypeInfoFactory and use it inside a TypeInfo >>> annotation over my POJO. >>> ``` >>> >>> My question is what do I need to do to get Flink to recognize my classes as >>> POJOs and use the POJO serializer for better performance? >>> I read through the documentation and stackoverflow and the conclusion is >>> that I need to make a TypeInfoFactory and use it inside a TypeInfo >>> annotation over my POJO. >>> While this seems incredibly tedious and I keep thinking "there must be a >>> better way", I would be fine with this solution if I could figure out how >>> to do this for the Set types I'm using. >>> >>> Any help would be appreciated. >> >