LuJiang created FLINK-36847: ------------------------------- Summary: Table API toDataStream() cannot be converted to an enum type field Key: FLINK-36847 URL: https://issues.apache.org/jira/browse/FLINK-36847 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.16.1 Environment: jdk: 1.8
flink: 1.16.1 Reporter: LuJiang toDataStream(table,clazz) will run fail when pojo Class contains enum fields.Exception occurs when searching for Class constructor: org.apache.flink.table.types.extraction.DataTypeExtractor#extractStructuredType {code:java} private DataType extractStructuredType( DataTypeTemplate template, List<Type> typeHierarchy, Type type) { final Class<?> clazz = toClass(type); if (clazz == null) { throw extractionError("Not a class type."); } validateStructuredClass(clazz); validateStructuredSelfReference(type, typeHierarchy); final List<Field> fields = collectStructuredFields(clazz); if (fields.isEmpty()) { throw extractionError("Class '%s' has no fields.", clazz.getName()); } // if not all fields are mutable, a default constructor is not enough final boolean allFieldsMutable = fields.stream() .allMatch( f -> { validateStructuredFieldReadability(clazz, f); return isStructuredFieldMutable(clazz, f); }); final ExtractionUtils.AssigningConstructor constructor = extractAssigningConstructor(clazz, fields); if (!allFieldsMutable && constructor == null) { throw extractionError( "Class '%s' has immutable fields and thus requires a constructor that is publicly " + "accessible and assigns all fields: %s", clazz.getName(), fields.stream().map(Field::getName).collect(Collectors.joining(", "))); } // check for a default constructor otherwise else if (constructor == null && !hasInvokableConstructor(clazz)) { throw extractionError( "Class '%s' has neither a constructor that assigns all fields nor a default constructor.", clazz.getName()); } final Map<String, DataType> fieldDataTypes = extractStructuredTypeFields(template, typeHierarchy, type, fields); final DataTypes.Field[] attributes = createStructuredTypeAttributes(constructor, fieldDataTypes); return DataTypes.STRUCTURED(clazz, attributes); }{code} extractAssigningConstructor cannot find any available constructor. It's easy to reproduce as following: {code:java} @Test public void testTable2Pojo() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String sql = readFromClasspath("table2pojo.sql"); tEnv.executeSql(sql); Table table = tEnv.sqlQuery("select * from my_user"); DataStream<MyUser> dataStream = tEnv.toDataStream(table, MyUser.class); dataStream.process(new ProcessFunction<MyUser, String>() { @Override public void processElement(MyUser value, ProcessFunction<MyUser, String>.Context ctx, Collector<String> out) throws Exception { out.collect(JSON.toJSONString(value)); } }).addSink(new PrintSinkFunction<>()); env.execute(getClass().getSimpleName()); } @Data @Accessors(chain = true) public class MyUser { private String username; private Integer age; private MyExtInfo info; } @Data @Accessors(chain = true) public class MyExtInfo { private String family; private String myHobby; private MyJobType job; } public enum MyJobType { Teacher, Artist, Scientist; } // table2pojo.sql create table my_user ( username STRING, age INT, info ROW < my_hobby STRING, family STRING, job STRING > ) with ( 'connector' = 'kafka', 'value.format' ='json', 'topic'='test', 'properties.group.id'='test001', 'properties.auto.offset.reset'= 'latest', 'properties.bootstrap.servers'='localhost:50000', 'value.fields-include' ='ALL') {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)