dawidwys commented on a change in pull request #14619: URL: https://github.com/apache/flink/pull/14619#discussion_r559383036
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java ########## @@ -260,35 +261,70 @@ private static DataType convertToStructuredType( fieldNames[pos], toDataType(dataTypeFactory, compositeType.getTypeAt(pos)))); - final List<String> fieldNamesOrdered; + final List<String> fieldNamesReordered; final boolean isNullable; // for POJOs and Avro records if (compositeType instanceof PojoTypeInfo) { - // POJO serializer supports top-level nulls - isNullable = true; - // all fields are nullable - fieldDataTypes.replaceAll((name, dataType) -> dataType.nullable()); - final PojoTypeInfo<?> pojoTypeInfo = (PojoTypeInfo<?>) compositeType; - final List<Field> fields = + final List<Field> pojoFields = IntStream.range(0, arity) .mapToObj(pojoTypeInfo::getPojoFieldAt) .map(PojoField::getField) .collect(Collectors.toList()); + + // POJO serializer supports top-level nulls + isNullable = true; + + // based on type information all fields are boxed classes, + // therefore we need to check the reflective field for more details + fieldDataTypes.replaceAll( + (name, dataType) -> { + final Class<?> fieldClass = + pojoFields.stream() + .filter(f -> f.getName().equals(name)) + .findFirst() + .orElseThrow(IllegalArgumentException::new) Review comment: nit: I'd rather use `IllegalStateException` as it is more likely an internal Flink error that the `TypeInformation` does not reflect its class. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/TypeInfoDataTypeConverter.java ########## @@ -260,35 +261,70 @@ private static DataType convertToStructuredType( fieldNames[pos], toDataType(dataTypeFactory, compositeType.getTypeAt(pos)))); - final List<String> fieldNamesOrdered; + final List<String> fieldNamesReordered; final boolean isNullable; // for POJOs and Avro records if (compositeType instanceof PojoTypeInfo) { - // POJO serializer supports top-level nulls - isNullable = true; - // all fields are nullable - fieldDataTypes.replaceAll((name, dataType) -> dataType.nullable()); - final PojoTypeInfo<?> pojoTypeInfo = (PojoTypeInfo<?>) compositeType; - final List<Field> fields = + final List<Field> pojoFields = IntStream.range(0, arity) .mapToObj(pojoTypeInfo::getPojoFieldAt) .map(PojoField::getField) .collect(Collectors.toList()); + + // POJO serializer supports top-level nulls + isNullable = true; + + // based on type information all fields are boxed classes, + // therefore we need to check the reflective field for more details + fieldDataTypes.replaceAll( + (name, dataType) -> { + final Class<?> fieldClass = + pojoFields.stream() + .filter(f -> f.getName().equals(name)) + .findFirst() + .orElseThrow(IllegalArgumentException::new) + .getType(); + if (fieldClass.isPrimitive()) { + return dataType.bridgedTo(fieldClass); Review comment: nit: How about we call `notNull` here, just for clarity. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org