Hello, AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing Flink can do about that. Here's an example of helper classes I've been using to support set serde in Flink POJOs, but note that it's hardcoded for LinkedHashSet, so you would have to create different implementations if you need to differentiate sorted sets:
https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398 Regards, Alexis. Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb <s...@sammar.sa>: > Hi, > > Here's a minimal example using an ArrayList, a HashSet, and a TreeSet: > ``` > package com.example; > import java.util.ArrayList; > import java.util.HashSet; > import java.util.TreeSet; > > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > public class App { > public static class Pojo { > public ArrayList<Integer> list; > public HashSet<Integer> set; > public TreeSet<Integer> treeset; > public Pojo() { > this.list = new ArrayList<>(); > this.set = new HashSet<>(); > this.treeset = new TreeSet<>(); > } > } > public static void main(String[] args) throws Exception { > var env = StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().disableGenericTypes(); > env.fromElements(new Pojo()).print(); > env.execute("Pipeline"); > } > } > ``` > > The result of running: > ``` > 13:08:20,074 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - class java.util.ArrayList does not contain a setter for field size > 13:08:20,077 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Class class java.util.ArrayList cannot be used as a POJO > type because not all fields are valid POJO fields, and must be processed as > GenericType. Please read the Flink documentation on "Data Types > & Serialization" for details of the effect on performance and schema > evolution. > 13:08:20,078 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Field Pojo#list will be processed as GenericType. Please read > the Flink documentation on "Data Types & Serialization" for details of the > effect on performance and schema evolution. > 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - No fields were detected for class java.util.HashSet so it cannot > be used as a POJO type and must be processed as GenericType. Please read > the Flink documentation on "Data Types & Serialization" for details of the > effect on performance and schema evolution. > 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Field Pojo#set will be processed as GenericType. Please read > the Flink documentation on "Data Types & Serialization" for details of the > effect on performance and schema evolution. > 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - No fields were detected for class java.util.TreeSet so it cannot > be used as a POJO type and must be processed as GenericType. Please read > the Flink documentation on "Data Types & Serialization" for details of the > effect on performance and schema evolution. > 13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor > [] - Field Pojo#sset will be processed as GenericType. Please read > the Flink documentation on "Data Types & Serialization" for details of the > effect on performance and schema evolution. > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by > org.apache.flink.api.java.ClosureCleaner > (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar) > to field java.lang.String.value > WARNING: Please consider reporting this to the maintainers of > org.apache.flink.api.java.ClosureCleaner > WARNING: Use --illegal-access=warn to enable warnings of further illegal > reflective access operations > WARNING: All illegal access operations will be denied in a future release > Exception in thread "main" java.lang.UnsupportedOperationException: > Generic types have been disabled in the ExecutionConfig and > type java.util.ArrayList is treated as a generic type. > at > org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87) > at > org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350) > at > org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342) > at > org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419) > at > org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391) > at > org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345) > > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66) > > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53) > > at > org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40) > at > org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052) > at com.example.App.main(App.java:26) > ``` > > Best regards, > Saleh. > > > On 14 Aug 2023, at 12:48 PM, Alexey Novakov via user < > user@flink.apache.org> wrote: > > Hi Saleh, > > If you could show us the minimal code example of the issue (event > classes), I think someone could help you to solve it. > > Best regards, > Alexey > > On Mon, Aug 14, 2023 at 9:23 AM <s...@sammar.sa> wrote: > >> Hi, >> >> According to this blog post >> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer >> The "Must be processed as GenericType" message means that the POJO >> serializer will not be used and instead, Kyro will be used. >> >> I created a simple POJO to test it again with a java Collection but I got >> the same message. Disabling generic types throws an exception. >> >> I'm not sure how to use these types along with the POJO serializer or any >> other fast serializer. >> >> Best regards, >> Saleh. >> >> >> >> On 14 Aug 2023, at 4:59 AM, liu ron <ron9....@gmail.com> wrote: >> >> Hi, >> >> According to the test in [1], I think Flink can recognize Pojo class >> which contains java List, so I think you can refer to the related Pojo >> class implementation. >> >> [1] >> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192 >> >> Best, >> Ron >> >> <s...@sammar.sa> 于2023年8月13日周日 22:50写道: >> >>> Greetings, >>> >>> I am working on a project that needs to process around 100k events per >>> second and I'm trying to improve performance. >>> >>> Most of the classes being used are POJOs but have a couple of fields >>> using a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` >>> etc. This forces Flink to use Kyro and throw these warnings: >>> >>> ``` >>> class java.util.ArrayList does not contain a setter for field size >>> Class class java.util.ArrayList cannot be used as a POJO type because >>> not all fields are valid POJO fields, and must be processed as GenericType. >>> Please read the Flink documentation on "Data Types & Serialization" for >>> details of the effect on performance and schema evolution. >>> ``` >>> >>> ``` >>> No fields were detected for class java.util.HashSet so it cannot be used >>> as a POJO type and must be processed as GenericType. Please read the Flink >>> documentation on "Data Types & Serialization" for details of the effect on >>> performance and schema evolution. >>> I read through the documentation and stackoverflow and the conclusion is >>> that I need to make a TypeInfoFactory and use it inside a TypeInfo >>> annotation over my POJO. >>> ``` >>> >>> My question is what do I need to do to get Flink to recognize my classes >>> as POJOs and use the POJO serializer for better performance? >>> I read through the documentation and stackoverflow and the conclusion is >>> that I need to make a TypeInfoFactory and use it inside a TypeInfo >>> annotation over my POJO. >>> While this seems incredibly tedious and I keep thinking "there must be a >>> better way", I would be fine with this solution if I could figure out how >>> to do this for the Set types I'm using. >>> >>> Any help would be appreciated. >> >> >> >