Antoine Michaud created FLINK-26470: ---------------------------------------
Summary: [Java][TypeExtractor] Missing type information in POJO types of some types (List, Map, UUID) Key: FLINK-26470 URL: https://issues.apache.org/jira/browse/FLINK-26470 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.13.2 Reporter: Antoine Michaud h2. Problem: h4. Basic collections (List, Map) and custom types are not compatible with flink pojo serialization. h2. Explanation: Like docs said, we should not use kryo in production since it's not performant at all. To stop using kryo, and use the native pojos serialization, we do this: {code:java} env.getConfig().disableGenericTypes(){code} But pojos have to meet [some requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types]. Regarding the following code coming from flink-core v1.13.2 (and looks the same in v1.14.4): {code:java} private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass( Class<OUT> clazz, List<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { checkNotNull(clazz); // check if type information can be produced using a factory final TypeInformation<OUT> typeFromFactory = createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type); if (typeFromFactory != null) { return typeFromFactory; } // Object is handled as generic type info if (clazz.equals(Object.class)) { return new GenericTypeInfo<>(clazz); } // Class is handled as generic type info if (clazz.equals(Class.class)) { return new GenericTypeInfo<>(clazz); } // recursive types are handled as generic type info if (countTypeInHierarchy(typeHierarchy, clazz) > 1) { return new GenericTypeInfo<>(clazz); } // check for arrays if (clazz.isArray()) { // primitive arrays: int[], byte[], ... PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo = PrimitiveArrayTypeInfo.getInfoFor(clazz); if (primitiveArrayInfo != null) { return primitiveArrayInfo; } // basic type arrays: String[], Integer[], Double[] BasicArrayTypeInfo<OUT, ?> basicArrayInfo = BasicArrayTypeInfo.getInfoFor(clazz); if (basicArrayInfo != null) { return basicArrayInfo; } // object arrays else { TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy( typeHierarchy, clazz.getComponentType(), in1Type, in2Type); return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo); } } // check for writable types if (isHadoopWritable(clazz)) { return createHadoopWritableTypeInfo(clazz); } // check for basic types TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz); if (basicTypeInfo != null) { return basicTypeInfo; } // check for SQL time types TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz); if (timeTypeInfo != null) { return timeTypeInfo; } // check for subclasses of Value if (Value.class.isAssignableFrom(clazz)) { Class<? extends Value> valueClass = clazz.asSubclass(Value.class); return (TypeInformation<OUT>) ValueTypeInfo.getValueTypeInfo(valueClass); } // check for subclasses of Tuple if (Tuple.class.isAssignableFrom(clazz)) { if (clazz == Tuple0.class) { return new TupleTypeInfo(Tuple0.class); } throw new InvalidTypesException( "Type information extraction for tuples (except Tuple0) cannot be done based on the class."); } // check for Enums if (Enum.class.isAssignableFrom(clazz)) { return new EnumTypeInfo(clazz); } // special case for POJOs generated by Avro. if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) { return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz); } if (Modifier.isInterface(clazz.getModifiers())) { // Interface has no members and is therefore not handled as POJO return new GenericTypeInfo<>(clazz); } try { Type t = parameterizedType != null ? parameterizedType : clazz; TypeInformation<OUT> pojoType = analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type, in2Type); if (pojoType != null) { return pojoType; } } catch (InvalidTypesException e) { if (LOG.isDebugEnabled()) { LOG.debug( "Unable to handle type " + clazz + " as POJO. Message: " + e.getMessage(), e); } // ignore and create generic type info } // return a generic type return new GenericTypeInfo<>(clazz); } {code} Only following types are compatible (e.g. not treated as GenericType): * All custom pojos with annotation @TypeInfo * arrays * All hadoop writable types * basic types (string, bigint/bigdecimal, instant/date, boxed primitives) * primitive types * sql time types * All implementing org.apache.flink.types.Value * All implementing org.apache.flink.api.java.tuple.Tuple * enums * Avro types * nested pojo But not: * List, Map, since they are falling into `Modifier.isInterface(clazz.getModifiers())` * UUID, since it is treated as generic pojo (no getter/setter on all fields) {quote}By the way, we can't register our custom serializer, that can really be the perfect world (@TypeInfo documentation says that there is TypeExtractor#registerFactory(Type, Class).. But there isn't) {quote} h3. How to fix it ? There is already existing ListTypeInfo and MapTypeInfo, that can be simply used by the method TypeExtractor.privateGetForClass(...). For UUID, we can create a customisable TypeInformationFactory, that can contains all specific stuff that is not fitting the native flink libs. The other way is to add it as a BasicType. _+I can help to contribute !+_ Thanks ! -- This message was sent by Atlassian Jira (v8.20.1#820001)