Antoine Michaud created FLINK-26470:

             Summary: [Java][TypeExtractor] Missing type information in POJO 
types of some types (List, Map, UUID)
                 Key: FLINK-26470
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 1.13.2
            Reporter: Antoine Michaud

h2. Problem:
h4. Basic collections (List, Map) and custom types are not compatible with 
flink pojo serialization.
h2. Explanation:

Like docs said, we should not use kryo in production since it's not performant 
at all.

To stop using kryo, and use the native pojos serialization, we do this:

But pojos have to meet [some 

Regarding the following code coming from flink-core v1.13.2 (and looks the same 
in v1.14.4):
private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
        Class<OUT> clazz,
        List<Type> typeHierarchy,
        ParameterizedType parameterizedType,
        TypeInformation<IN1> in1Type,
        TypeInformation<IN2> in2Type) {

    // check if type information can be produced using a factory
    final TypeInformation<OUT> typeFromFactory =
            createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
    if (typeFromFactory != null) {
        return typeFromFactory;

    // Object is handled as generic type info
    if (clazz.equals(Object.class)) {
        return new GenericTypeInfo<>(clazz);

    // Class is handled as generic type info
    if (clazz.equals(Class.class)) {
        return new GenericTypeInfo<>(clazz);

    // recursive types are handled as generic type info
    if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
        return new GenericTypeInfo<>(clazz);

    // check for arrays
    if (clazz.isArray()) {

        // primitive arrays: int[], byte[], ...
        PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
        if (primitiveArrayInfo != null) {
            return primitiveArrayInfo;

        // basic type arrays: String[], Integer[], Double[]
        BasicArrayTypeInfo<OUT, ?> basicArrayInfo = 
        if (basicArrayInfo != null) {
            return basicArrayInfo;

        // object arrays
        else {
            TypeInformation<?> componentTypeInfo =
                            typeHierarchy, clazz.getComponentType(), in1Type, 

            return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);

    // check for writable types
    if (isHadoopWritable(clazz)) {
        return createHadoopWritableTypeInfo(clazz);

    // check for basic types
    TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
    if (basicTypeInfo != null) {
        return basicTypeInfo;

    // check for SQL time types
    TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
    if (timeTypeInfo != null) {
        return timeTypeInfo;

    // check for subclasses of Value
    if (Value.class.isAssignableFrom(clazz)) {
        Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
        return (TypeInformation<OUT>) 

    // check for subclasses of Tuple
    if (Tuple.class.isAssignableFrom(clazz)) {
        if (clazz == Tuple0.class) {
            return new TupleTypeInfo(Tuple0.class);
        throw new InvalidTypesException(
                "Type information extraction for tuples (except Tuple0) cannot 
be done based on the class.");

    // check for Enums
    if (Enum.class.isAssignableFrom(clazz)) {
        return new EnumTypeInfo(clazz);

    // special case for POJOs generated by Avro.
    if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
        return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);

    if (Modifier.isInterface(clazz.getModifiers())) {
        // Interface has no members and is therefore not handled as POJO
        return new GenericTypeInfo<>(clazz);

    try {
        Type t = parameterizedType != null ? parameterizedType : clazz;
        TypeInformation<OUT> pojoType =
                analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, 
        if (pojoType != null) {
            return pojoType;
    } catch (InvalidTypesException e) {
        if (LOG.isDebugEnabled()) {
                    "Unable to handle type " + clazz + " as POJO. Message: " + 
        // ignore and create generic type info

    // return a generic type
    return new GenericTypeInfo<>(clazz);
} {code}

Only following types are compatible (e.g. not treated as GenericType):
 * All custom pojos with annotation @TypeInfo
 * arrays
 * All hadoop writable types
 * basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
 * primitive types
 * sql time types
 * All implementing org.apache.flink.types.Value
 * All implementing
 * enums
 * Avro types
 * nested pojo

But not:
 * List, Map, since they are falling into 
 * UUID, since it is treated as generic pojo (no getter/setter on all fields)

{quote}By the way, we can't register our custom serializer, that can really be 
the perfect world (@TypeInfo documentation says that there is 
TypeExtractor#registerFactory(Type, Class).. But there isn't)

h3. How to fix it ?

There is already existing ListTypeInfo and MapTypeInfo, that can be simply used 
by the  method TypeExtractor.privateGetForClass(...).

For UUID, we can create a customisable TypeInformationFactory, that can 
contains all specific stuff that is not fitting the native flink libs. The 
other way is to add it as a BasicType.


_+I can help to contribute !+_


Thanks !

This message was sent by Atlassian Jira

Reply via email to