LuJiang created FLINK-36847:
-------------------------------
Summary: Table API toDataStream() cannot be converted to an enum
type field
Key: FLINK-36847
URL: https://issues.apache.org/jira/browse/FLINK-36847
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.16.1
Environment: jdk: 1.8
flink: 1.16.1
Reporter: LuJiang
toDataStream(table,clazz) will run fail when pojo Class contains enum
fields.Exception occurs when searching for Class constructor:
org.apache.flink.table.types.extraction.DataTypeExtractor#extractStructuredType
{code:java}
private DataType extractStructuredType(
DataTypeTemplate template, List<Type> typeHierarchy, Type type) {
final Class<?> clazz = toClass(type);
if (clazz == null) {
throw extractionError("Not a class type.");
} validateStructuredClass(clazz);
validateStructuredSelfReference(type, typeHierarchy); final
List<Field> fields = collectStructuredFields(clazz); if
(fields.isEmpty()) {
throw extractionError("Class '%s' has no fields.", clazz.getName());
} // if not all fields are mutable, a default constructor is not
enough
final boolean allFieldsMutable =
fields.stream()
.allMatch(
f -> {
validateStructuredFieldReadability(clazz,
f);
return isStructuredFieldMutable(clazz, f);
}); final
ExtractionUtils.AssigningConstructor constructor =
extractAssigningConstructor(clazz, fields);
if (!allFieldsMutable && constructor == null) {
throw extractionError(
"Class '%s' has immutable fields and thus requires a
constructor that is publicly "
+ "accessible and assigns all fields: %s",
clazz.getName(),
fields.stream().map(Field::getName).collect(Collectors.joining(", ")));
}
// check for a default constructor otherwise
else if (constructor == null && !hasInvokableConstructor(clazz)) {
throw extractionError(
"Class '%s' has neither a constructor that assigns all
fields nor a default constructor.",
clazz.getName());
} final Map<String, DataType> fieldDataTypes =
extractStructuredTypeFields(template, typeHierarchy, type,
fields); final DataTypes.Field[] attributes =
createStructuredTypeAttributes(constructor, fieldDataTypes);
return DataTypes.STRUCTURED(clazz, attributes);
}{code}
extractAssigningConstructor cannot find any available constructor.
It's easy to reproduce as following:
{code:java}
@Test
public void testTable2Pojo() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String sql = readFromClasspath("table2pojo.sql");
tEnv.executeSql(sql);
Table table = tEnv.sqlQuery("select * from my_user");
DataStream<MyUser> dataStream = tEnv.toDataStream(table, MyUser.class);
dataStream.process(new ProcessFunction<MyUser, String>() {
@Override
public void processElement(MyUser value, ProcessFunction<MyUser,
String>.Context ctx, Collector<String> out) throws Exception {
out.collect(JSON.toJSONString(value));
}
}).addSink(new PrintSinkFunction<>());
env.execute(getClass().getSimpleName());
}
@Data
@Accessors(chain = true)
public class MyUser {
private String username;
private Integer age;
private MyExtInfo info;
}
@Data
@Accessors(chain = true)
public class MyExtInfo {
private String family;
private String myHobby;
private MyJobType job;
}
public enum MyJobType {
Teacher,
Artist,
Scientist;
}
// table2pojo.sql
create table my_user
(
username STRING,
age INT,
info ROW <
my_hobby STRING,
family STRING,
job STRING >
)
with
( 'connector' = 'kafka',
'value.format' ='json',
'topic'='test',
'properties.group.id'='test001',
'properties.auto.offset.reset'= 'latest',
'properties.bootstrap.servers'='localhost:50000',
'value.fields-include' ='ALL')
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)