Thanks for your replies. I agree this is a somewhat general problem. I posted it here as I was trying to register the valid subclasses in Kryo but I couldn't get the message to go away, i.e., everything worked correctly but there was the complaint that GenericType serialization was being used.
This is how I was registering these types: env.getConfig.registerKryoType(classOf[java.lang.Integer]) env.getConfig.registerKryoType(classOf[java.lang.Double]) and this is the message I got on every event: flink-task-manager_1 | 2021-04-23 16:48:29.274 [Processor Function 1 (1/2)#0] INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class java.lang.Number so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. In the meanwhile, I've changed my approach to reuse a protobuf type I already had as part of my input event. Once again, thanks for your replies because they gave me the right perspective. Arvid Heise <ar...@apache.org> escreveu no dia quarta, 21/04/2021 à(s) 18:26: > Hi Miguel, > > as Klemens said this is a rather general problem independent of Flink: How > do you map Polymorphism in serialization? > > Flink doesn't have an answer on its own, as it's discouraged (A Number can > have arbitrary many subclasses: how do you distinguish them except by > classname? That adds a ton of overhead.). The easiest solution in your case > is to convert ints into double. > Or you use Kryo which dictionary encodes the classes and also limits the > possible subclasses. > > On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann < > klemens.muthm...@cyface.de> wrote: > >> Hi, >> >> I guess this is more of a Java Problem than a Flink Problem. If you want >> it quick and dirty you could implement a class such as: >> >> public class Value { >> private boolean isLongSet = false; >> private long longValue = 0L; >> private boolean isIntegerSet = false; >> private int intValue = 0; >> >> public Value(final long value) { >> setLong(value); >> } >> >> public void setLong(final long value) | >> longValue = value; >> isLongSet = true; >> } >> >> public long getLong() { >> if(isLongSet) { >> return longValue >> } >> } >> >> // Add same methods for int >> // to satisfy POJO requirements you will also need to add a >> no-argument constructor as well as getters and setters for the boolean flags >> } >> >> I guess a cleaner solution would be possible using a custom Kryo >> serializer as explained here: >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html >> >> Regards >> Klemens >> >> >> >> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo <miguelaraujo...@gmail.com >> >: >> > >> > Hi everyone, >> > >> > I have a ProcessFunction which needs to store different number types >> for different keys, e.g., some keys need to store an integer while others >> need to store a double. >> > >> > I tried to use java.lang.Number as the type for the ValueState, but I >> got the expected "No fields were detected for class java.lang.Number so it >> cannot be used as a POJO type and must be processed as GenericType." >> > >> > I have the feeling that this is not the right approach, but the exact >> type to be stored is only known at runtime which makes things a bit >> trickier. Is there a way to register these classes correctly, or Is it >> preferable to use different ValueState's for different types? >> > >> > Thanks, >> > Miguel >> >>