Thanks for your replies. I agree this is a somewhat general problem.
I posted it here as I was trying to register the valid subclasses in Kryo
but I couldn't get the message to go away, i.e., everything worked
correctly but there was the complaint that GenericType serialization was
being used.

This is how I was registering these types:

env.getConfig.registerKryoType(classOf[java.lang.Integer])
env.getConfig.registerKryoType(classOf[java.lang.Double])

and this is the message I got on every event:

flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1
(1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No
fields were detected for class java.lang.Number so it cannot be used as a
POJO type and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance.

In the meanwhile, I've changed my approach to reuse a protobuf type I
already had as part of my input event.

Once again, thanks for your replies because they gave me the right
perspective.



Arvid Heise <ar...@apache.org> escreveu no dia quarta, 21/04/2021 à(s)
18:26:

> Hi Miguel,
>
> as Klemens said this is a rather general problem independent of Flink: How
> do you map Polymorphism in serialization?
>
> Flink doesn't have an answer on its own, as it's discouraged (A Number can
> have arbitrary many subclasses: how do you distinguish them except by
> classname? That adds a ton of overhead.). The easiest solution in your case
> is to convert ints into double.
> Or you use Kryo which dictionary encodes the classes and also limits the
> possible subclasses.
>
> On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> I guess this is more of a Java Problem than a Flink Problem. If you want
>> it quick and dirty you could implement a class such as:
>>
>> public class Value {
>>     private boolean isLongSet = false;
>>     private long longValue = 0L;
>>     private boolean isIntegerSet = false;
>>     private int intValue = 0;
>>
>>    public Value(final long value) {
>>        setLong(value);
>>    }
>>
>>     public void setLong(final long value) |
>>         longValue = value;
>>         isLongSet = true;
>>    }
>>
>>    public long getLong() {
>>        if(isLongSet) {
>>            return longValue
>>        }
>>    }
>>
>>    // Add same methods for int
>>    // to satisfy POJO requirements you will also need to add a
>> no-argument constructor as well as getters and setters for the boolean flags
>> }
>>
>> I guess a cleaner solution would be possible using a custom Kryo
>> serializer as explained here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>
>> Regards
>>       Klemens
>>
>>
>>
>> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo <miguelaraujo...@gmail.com
>> >:
>> >
>> > Hi everyone,
>> >
>> > I have a ProcessFunction which needs to store different number types
>> for different keys, e.g., some keys need to store an integer while others
>> need to store a double.
>> >
>> > I tried to use java.lang.Number as the type for the ValueState, but I
>> got the expected "No fields were detected for class java.lang.Number so it
>> cannot be used as a POJO type and must be processed as GenericType."
>> >
>> > I have the feeling that this is not the right approach, but the exact
>> type to be stored is only known at runtime which makes things a bit
>> trickier. Is there a way to register these classes correctly, or Is it
>> preferable to use different ValueState's for different types?
>> >
>> > Thanks,
>> > Miguel
>>
>>

Reply via email to