Hi, Here's a minimal example using an ArrayList, a HashSet, and a TreeSet: ``` package com.example; import java.util.ArrayList; import java.util.HashSet; import java.util.TreeSet; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class App { public static class Pojo { public ArrayList<Integer> list; public HashSet<Integer> set; public TreeSet<Integer> treeset; public Pojo() { this.list = new ArrayList<>(); this.set = new HashSet<>(); this.treeset = new TreeSet<>(); } } public static void main(String[] args) throws Exception { var env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableGenericTypes(); env.fromElements(new Pojo()).print(); env.execute("Pipeline"); } } ```
The result of running: ``` 13:08:20,074 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class java.util.ArrayList does not contain a setter for field size 13:08:20,077 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class java.util.ArrayList cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. 13:08:20,078 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field Pojo#list will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - No fields were detected for class java.util.HashSet so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field Pojo#set will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - No fields were detected for class java.util.TreeSet so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Field Pojo#sset will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution. WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.ArrayList is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350) at org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342) at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391) at org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345) at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66) at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53) at org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40) at org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052) at com.example.App.main(App.java:26) ``` Best regards, Saleh. > On 14 Aug 2023, at 12:48 PM, Alexey Novakov via user <user@flink.apache.org> > wrote: > > Hi Saleh, > > If you could show us the minimal code example of the issue (event classes), I > think someone could help you to solve it. > > Best regards, > Alexey > > On Mon, Aug 14, 2023 at 9:23 AM <s...@sammar.sa <mailto:s...@sammar.sa>> > wrote: > Hi, > > According to this blog post > https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer > > <https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer> > The "Must be processed as GenericType" message means that the POJO serializer > will not be used and instead, Kyro will be used. > > I created a simple POJO to test it again with a java Collection but I got the > same message. Disabling generic types throws an exception. > > I'm not sure how to use these types along with the POJO serializer or any > other fast serializer. > > Best regards, > Saleh. > > > >> On 14 Aug 2023, at 4:59 AM, liu ron <ron9....@gmail.com >> <mailto:ron9....@gmail.com>> wrote: >> >> Hi, >> >> According to the test in [1], I think Flink can recognize Pojo class which >> contains java List, so I think you can refer to the related Pojo class >> implementation. >> >> [1] >> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192 >> >> <https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192> >> >> Best, >> Ron >> >> <s...@sammar.sa <mailto:s...@sammar.sa>> 于2023年8月13日周日 22:50写道: >> Greetings, >> >> I am working on a project that needs to process around 100k events per >> second and I'm trying to improve performance. >> >> Most of the classes being used are POJOs but have a couple of fields using a >> `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This >> forces Flink to use Kyro and throw these warnings: >> >> ``` >> class java.util.ArrayList does not contain a setter for field size >> Class class java.util.ArrayList cannot be used as a POJO type because not >> all fields are valid POJO fields, and must be processed as GenericType. >> Please read the Flink documentation on "Data Types & Serialization" for >> details of the effect on performance and schema evolution. >> ``` >> >> ``` >> No fields were detected for class java.util.HashSet so it cannot be used as >> a POJO type and must be processed as GenericType. Please read the Flink >> documentation on "Data Types & Serialization" for details of the effect on >> performance and schema evolution. >> I read through the documentation and stackoverflow and the conclusion is >> that I need to make a TypeInfoFactory and use it inside a TypeInfo >> annotation over my POJO. >> ``` >> >> My question is what do I need to do to get Flink to recognize my classes as >> POJOs and use the POJO serializer for better performance? >> I read through the documentation and stackoverflow and the conclusion is >> that I need to make a TypeInfoFactory and use it inside a TypeInfo >> annotation over my POJO. >> While this seems incredibly tedious and I keep thinking "there must be a >> better way", I would be fine with this solution if I could figure out how to >> do this for the Set types I'm using. >> >> Any help would be appreciated. >