LuJiang created FLINK-36847:
-------------------------------

             Summary: Table API toDataStream()  cannot be converted to an enum 
type field
                 Key: FLINK-36847
                 URL: https://issues.apache.org/jira/browse/FLINK-36847
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.16.1
         Environment: jdk: 1.8

flink: 1.16.1

 
            Reporter: LuJiang


toDataStream(table,clazz) will run fail when pojo Class contains enum 
fields.Exception occurs when searching for Class constructor:

org.apache.flink.table.types.extraction.DataTypeExtractor#extractStructuredType
{code:java}
private DataType extractStructuredType(
            DataTypeTemplate template, List<Type> typeHierarchy, Type type) {
        final Class<?> clazz = toClass(type);
        if (clazz == null) {
            throw extractionError("Not a class type.");
        }        validateStructuredClass(clazz);
        validateStructuredSelfReference(type, typeHierarchy);        final 
List<Field> fields = collectStructuredFields(clazz);        if 
(fields.isEmpty()) {
            throw extractionError("Class '%s' has no fields.", clazz.getName());
        }        // if not all fields are mutable, a default constructor is not 
enough
        final boolean allFieldsMutable =
                fields.stream()
                        .allMatch(
                                f -> {
                                    validateStructuredFieldReadability(clazz, 
f);
                                    return isStructuredFieldMutable(clazz, f);
                                });        final 
ExtractionUtils.AssigningConstructor constructor =
                extractAssigningConstructor(clazz, fields);
        if (!allFieldsMutable && constructor == null) {
            throw extractionError(
                    "Class '%s' has immutable fields and thus requires a 
constructor that is publicly "
                            + "accessible and assigns all fields: %s",
                    clazz.getName(),
                    
fields.stream().map(Field::getName).collect(Collectors.joining(", ")));
        }
        // check for a default constructor otherwise
        else if (constructor == null && !hasInvokableConstructor(clazz)) {
            throw extractionError(
                    "Class '%s' has neither a constructor that assigns all 
fields nor a default constructor.",
                    clazz.getName());
        }        final Map<String, DataType> fieldDataTypes =
                extractStructuredTypeFields(template, typeHierarchy, type, 
fields);        final DataTypes.Field[] attributes =
                createStructuredTypeAttributes(constructor, fieldDataTypes);    
    return DataTypes.STRUCTURED(clazz, attributes);
    }{code}
extractAssigningConstructor cannot find any available constructor.

 

It's easy to reproduce as following:

 

 
{code:java}
@Test
public void testTable2Pojo() throws Exception {
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    String sql = readFromClasspath("table2pojo.sql");
    tEnv.executeSql(sql);
    Table table = tEnv.sqlQuery("select * from my_user");
    DataStream<MyUser> dataStream = tEnv.toDataStream(table, MyUser.class);
    dataStream.process(new ProcessFunction<MyUser, String>() {
        @Override
        public void processElement(MyUser value, ProcessFunction<MyUser, 
String>.Context ctx, Collector<String> out) throws Exception {
            out.collect(JSON.toJSONString(value));
        }
    }).addSink(new PrintSinkFunction<>());
    env.execute(getClass().getSimpleName());
}

@Data
@Accessors(chain = true)
public class MyUser {
    private String username;
    private Integer age;
    private MyExtInfo info;
}

@Data
@Accessors(chain = true)
public class MyExtInfo {
    private String family;
    private String myHobby;
    private MyJobType job;
}

public enum MyJobType {
    Teacher,
    Artist,
    Scientist;
}

// table2pojo.sql
create table my_user
(
    username   STRING,
    age    INT,
    info ROW <
        my_hobby STRING,
    family STRING,
    job    STRING >
)
with
    ( 'connector' = 'kafka',
        'value.format' ='json',
        'topic'='test',
        'properties.group.id'='test001',
        'properties.auto.offset.reset'= 'latest',
        'properties.bootstrap.servers'='localhost:50000',
        'value.fields-include' ='ALL')

{code}
 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to