Hello Alexis,

Thank you for sharing the helper classes this but unfortunately I have no idea 
how to use these classes or how they might be able to help me. This is all very 
new to me and I honestly can't wrap my head around Flink's type information 
system.

Best regards,
Saleh.

> On 14 Aug 2023, at 4:05 PM, Alexis Sarda-Espinosa <sarda.espin...@gmail.com> 
> wrote:
> 
> Hello,
> 
> AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing 
> Flink can do about that. Here's an example of helper classes I've been using 
> to support set serde in Flink POJOs, but note that it's hardcoded for 
> LinkedHashSet, so you would have to create different implementations if you 
> need to differentiate sorted sets:
> 
> https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398 
> <https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398>
> 
> Regards,
> Alexis.
> 
> 
> Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb <s...@sammar.sa 
> <mailto:s...@sammar.sa>>:
> Hi,
> 
> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
> ```
> package com.example;
> import java.util.ArrayList;
> import java.util.HashSet;
> import java.util.TreeSet;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class App {
>     public static class Pojo {
>         public ArrayList<Integer> list;
>         public HashSet<Integer> set;
>         public TreeSet<Integer> treeset;
>         public Pojo() {
>             this.list = new ArrayList<>();
>             this.set = new HashSet<>();
>             this.treeset = new TreeSet<>();
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         var env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.getConfig().disableGenericTypes();
>         env.fromElements(new Pojo()).print();
>         env.execute("Pipeline");
>     }
> }
> ```
> 
> The result of running:
> ```
> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - class java.util.ArrayList does not contain a setter for field size
> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - Class class java.util.ArrayList cannot be used as a POJO type because 
> not all fields are valid POJO fields, and must be processed as GenericType. 
> Please read the Flink documentation on "Data Types & Serialization" for 
> details of the effect on performance and schema evolution.
> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - Field Pojo#list will be processed as GenericType. Please read the 
> Flink documentation on "Data Types & Serialization" for details of the effect 
> on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - No fields were detected for class java.util.HashSet so it cannot be 
> used as a POJO type and must be processed as GenericType. Please read the 
> Flink documentation on "Data Types & Serialization" for details of the effect 
> on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - Field Pojo#set will be processed as GenericType. Please read the Flink 
> documentation on "Data Types & Serialization" for details of the effect on 
> performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - No fields were detected for class java.util.TreeSet so it cannot be 
> used as a POJO type and must be processed as GenericType. Please read the 
> Flink documentation on "Data Types & Serialization" for details of the effect 
> on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - Field Pojo#sset will be processed as GenericType. Please read the 
> Flink documentation on "Data Types & Serialization" for details of the effect 
> on performance and schema evolution.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by 
> org.apache.flink.api.java.ClosureCleaner 
> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
>  to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
> types have been disabled in the ExecutionConfig and type java.util.ArrayList 
> is treated as a generic type.
>         at 
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>         at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
>         at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
>         at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
>         at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
>         at 
> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
>         at 
> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
>         at 
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248)
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239)
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225)
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052)
>         at com.example.App.main(App.java:26)
> ```
> 
> Best regards,
> Saleh.
> 
> 
>> On 14 Aug 2023, at 12:48 PM, Alexey Novakov via user <user@flink.apache.org 
>> <mailto:user@flink.apache.org>> wrote:
>> 
>> Hi Saleh,
>> 
>> If you could show us the minimal code example of the issue (event classes), 
>> I think someone could help you to solve it.
>> 
>> Best regards,
>> Alexey
>> 
>> On Mon, Aug 14, 2023 at 9:23 AM <s...@sammar.sa <mailto:s...@sammar.sa>> 
>> wrote:
>> Hi,
>> 
>> According to this blog post 
>> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
>>  
>> <https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer>
>> The "Must be processed as GenericType" message means that the POJO 
>> serializer will not be used and instead, Kyro will be used.
>> 
>> I created a simple POJO to test it again with a java Collection but I got 
>> the same message. Disabling generic types throws an exception.
>> 
>> I'm not sure how to use these types along with the POJO serializer or any 
>> other fast serializer.
>> 
>> Best regards,
>> Saleh.
>> 
>> 
>> 
>>> On 14 Aug 2023, at 4:59 AM, liu ron <ron9....@gmail.com 
>>> <mailto:ron9....@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> According to the test in [1], I think Flink can recognize Pojo class which 
>>> contains java List, so I think you can refer to the related Pojo class 
>>> implementation.
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>>>  
>>> <https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192>
>>> 
>>> Best,
>>> Ron
>>> 
>>> <s...@sammar.sa <mailto:s...@sammar.sa>> 于2023年8月13日周日 22:50写道:
>>> Greetings,
>>> 
>>> I am working on a project that needs to process around 100k events per 
>>> second and I'm trying to improve performance.
>>> 
>>> Most of the classes being used are POJOs but have a couple of fields using 
>>> a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
>>> forces Flink to use Kyro and throw these warnings:
>>> 
>>> ```
>>> class java.util.ArrayList does not contain a setter for field size
>>> Class class java.util.ArrayList cannot be used as a POJO type because not 
>>> all fields are valid POJO fields, and must be processed as GenericType. 
>>> Please read the Flink documentation on "Data Types & Serialization" for 
>>> details of the effect on performance and schema evolution.
>>> ```
>>> 
>>> ```
>>> No fields were detected for class java.util.HashSet so it cannot be used as 
>>> a POJO type and must be processed as GenericType. Please read the Flink 
>>> documentation on "Data Types & Serialization" for details of the effect on 
>>> performance and schema evolution.
>>> I read through the documentation and stackoverflow and the conclusion is 
>>> that I need to make a TypeInfoFactory and use it inside a TypeInfo 
>>> annotation over my POJO.
>>> ```
>>> 
>>> My question is what do I need to do to get Flink to recognize my classes as 
>>> POJOs and use the POJO serializer for better performance?
>>> I read through the documentation and stackoverflow and the conclusion is 
>>> that I need to make a TypeInfoFactory and use it inside a TypeInfo 
>>> annotation over my POJO.
>>> While this seems incredibly tedious and I keep thinking "there must be a 
>>> better way", I would be fine with this solution if I could figure out how 
>>> to do this for the Set types I'm using.
>>> 
>>> Any help would be appreciated.
>> 
> 

Reply via email to