Antoine Michaud created FLINK-26470:
---------------------------------------

             Summary: [Java][TypeExtractor] Missing type information in POJO 
types of some types (List, Map, UUID)
                 Key: FLINK-26470
                 URL: https://issues.apache.org/jira/browse/FLINK-26470
             Project: Flink
          Issue Type: Bug
          Components: API / Core
    Affects Versions: 1.13.2
            Reporter: Antoine Michaud


h2. Problem:
h4. Basic collections (List, Map) and custom types are not compatible with 
flink pojo serialization.
h2. Explanation:

Like docs said, we should not use kryo in production since it's not performant 
at all.

To stop using kryo, and use the native pojos serialization, we do this:
{code:java}
env.getConfig().disableGenericTypes(){code}
 

But pojos have to meet [some 
requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types].

Regarding the following code coming from flink-core v1.13.2 (and looks the same 
in v1.14.4):
{code:java}
private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
        Class<OUT> clazz,
        List<Type> typeHierarchy,
        ParameterizedType parameterizedType,
        TypeInformation<IN1> in1Type,
        TypeInformation<IN2> in2Type) {
    checkNotNull(clazz);

    // check if type information can be produced using a factory
    final TypeInformation<OUT> typeFromFactory =
            createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
    if (typeFromFactory != null) {
        return typeFromFactory;
    }

    // Object is handled as generic type info
    if (clazz.equals(Object.class)) {
        return new GenericTypeInfo<>(clazz);
    }

    // Class is handled as generic type info
    if (clazz.equals(Class.class)) {
        return new GenericTypeInfo<>(clazz);
    }

    // recursive types are handled as generic type info
    if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
        return new GenericTypeInfo<>(clazz);
    }

    // check for arrays
    if (clazz.isArray()) {

        // primitive arrays: int[], byte[], ...
        PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
                PrimitiveArrayTypeInfo.getInfoFor(clazz);
        if (primitiveArrayInfo != null) {
            return primitiveArrayInfo;
        }

        // basic type arrays: String[], Integer[], Double[]
        BasicArrayTypeInfo<OUT, ?> basicArrayInfo = 
BasicArrayTypeInfo.getInfoFor(clazz);
        if (basicArrayInfo != null) {
            return basicArrayInfo;
        }

        // object arrays
        else {
            TypeInformation<?> componentTypeInfo =
                    createTypeInfoWithTypeHierarchy(
                            typeHierarchy, clazz.getComponentType(), in1Type, 
in2Type);

            return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
        }
    }

    // check for writable types
    if (isHadoopWritable(clazz)) {
        return createHadoopWritableTypeInfo(clazz);
    }

    // check for basic types
    TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
    if (basicTypeInfo != null) {
        return basicTypeInfo;
    }

    // check for SQL time types
    TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
    if (timeTypeInfo != null) {
        return timeTypeInfo;
    }

    // check for subclasses of Value
    if (Value.class.isAssignableFrom(clazz)) {
        Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
        return (TypeInformation<OUT>) 
ValueTypeInfo.getValueTypeInfo(valueClass);
    }

    // check for subclasses of Tuple
    if (Tuple.class.isAssignableFrom(clazz)) {
        if (clazz == Tuple0.class) {
            return new TupleTypeInfo(Tuple0.class);
        }
        throw new InvalidTypesException(
                "Type information extraction for tuples (except Tuple0) cannot 
be done based on the class.");
    }

    // check for Enums
    if (Enum.class.isAssignableFrom(clazz)) {
        return new EnumTypeInfo(clazz);
    }

    // special case for POJOs generated by Avro.
    if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
        return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
    }

    if (Modifier.isInterface(clazz.getModifiers())) {
        // Interface has no members and is therefore not handled as POJO
        return new GenericTypeInfo<>(clazz);
    }

    try {
        Type t = parameterizedType != null ? parameterizedType : clazz;
        TypeInformation<OUT> pojoType =
                analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, 
in2Type);
        if (pojoType != null) {
            return pojoType;
        }
    } catch (InvalidTypesException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "Unable to handle type " + clazz + " as POJO. Message: " + 
e.getMessage(),
                    e);
        }
        // ignore and create generic type info
    }

    // return a generic type
    return new GenericTypeInfo<>(clazz);
} {code}
 

Only following types are compatible (e.g. not treated as GenericType):
 * All custom pojos with annotation @TypeInfo
 * arrays
 * All hadoop writable types
 * basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
 * primitive types
 * sql time types
 * All implementing org.apache.flink.types.Value
 * All implementing org.apache.flink.api.java.tuple.Tuple
 * enums
 * Avro types
 * nested pojo

But not:
 * List, Map, since they are falling into 
`Modifier.isInterface(clazz.getModifiers())`
 * UUID, since it is treated as generic pojo (no getter/setter on all fields)

 
{quote}By the way, we can't register our custom serializer, that can really be 
the perfect world (@TypeInfo documentation says that there is 
TypeExtractor#registerFactory(Type, Class).. But there isn't)
{quote}
 

 
h3. How to fix it ?

There is already existing ListTypeInfo and MapTypeInfo, that can be simply used 
by the  method TypeExtractor.privateGetForClass(...).

For UUID, we can create a customisable TypeInformationFactory, that can 
contains all specific stuff that is not fitting the native flink libs. The 
other way is to add it as a BasicType.

 

_+I can help to contribute !+_

 

Thanks !



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to