[ https://issues.apache.org/jira/browse/FLINK-23865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zoucao updated FLINK-23865: --------------------------- Description: code: {code:java} Table table = tbEnv.fromValues(DataTypes.ROW( DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", STRING()))), DataTypes.FIELD("b", STRING()), DataTypes.FIELD("a", INT())), Row.of(Row.of("str-c"), "str-b", 1)); DataStream<Pojo> pojoDataStream = tbEnv.toAppendStream(table, Pojo.class); ----------------------------- public static class Pojo{ public InnerPojo innerPojo; public String b; public int a; public Pojo() { } } public static class InnerPojo { public String c; public InnerPojo() { } }{code} error: {code:java} java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType cannot be cast to org.apache.flink.table.types.logical.RowType java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType cannot be cast to org.apache.flink.table.types.logical.RowType at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155) {code} The fields of PojoTypeInfo is in the alphabet order, such that in `expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own index,but now we use the pojo field index to get 'queryLogicalType', this will casue the field type mismatch. It should be fixed like : {code:java} val queryIndex = queryLogicalType.getFieldIndex(name) val nestedLogicalType = queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]{code} was: code: {code:java} Table table = tbEnv.fromValues(DataTypes.ROW( DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", STRING()))), DataTypes.FIELD("b", STRING()), DataTypes.FIELD("a", INT())), Row.of(Row.of("str-c"), "str-b", 1)); DataStream<Pojo> pojoDataStream = tbEnv.toAppendStream(table, Pojo.class); ----------------------------- public static class Pojo{ public InnerPojo innerPojo; public String b; public int a; public Pojo() { } } public static class InnerPojo { public String c; public InnerPojo() { } }{code} error: {code:java} java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType cannot be cast to org.apache.flink.table.types.logical.RowTypejava.lang.ClassCastException: org.apache.flink.table.types.logical.IntType cannot be cast to org.apache.flink.table.types.logical.RowType at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155) {code} The fields of PojoTypeInfo is in the alphabet order, such that in `expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own index,but now we use the pojo field index to get 'queryLogicalType', this will casue the field type mismatch. It should be fixed like : {code:java} val queryIndex = queryLogicalType.getFieldIndex(name) val nestedLogicalType = queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]{code} > Class cast error caused by nested Pojo in legacy outputConversion > ----------------------------------------------------------------- > > Key: FLINK-23865 > URL: https://issues.apache.org/jira/browse/FLINK-23865 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.2 > Reporter: zoucao > Priority: Major > > code: > {code:java} > Table table = tbEnv.fromValues(DataTypes.ROW( > DataTypes.FIELD("innerPojo", DataTypes.ROW(DataTypes.FIELD("c", > STRING()))), > DataTypes.FIELD("b", STRING()), > DataTypes.FIELD("a", INT())), > Row.of(Row.of("str-c"), "str-b", 1)); > DataStream<Pojo> pojoDataStream = tbEnv.toAppendStream(table, Pojo.class); > ----------------------------- > public static class Pojo{ > public InnerPojo innerPojo; > public String b; > public int a; > public Pojo() { > } > } > public static class InnerPojo { > public String c; > public InnerPojo() { > } > }{code} > error: > {code:java} > java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType > cannot be cast to org.apache.flink.table.types.logical.RowType > java.lang.ClassCastException: org.apache.flink.table.types.logical.IntType > cannot be cast to org.apache.flink.table.types.logical.RowType at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:163) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$1.apply(TableSinkUtils.scala:155) > {code} > The fields of PojoTypeInfo is in the alphabet order, such that in > `expandPojoTypeToSchema`, 'pojoType' and 'queryLogicalType' should have own > index,but now we use the pojo field index to get 'queryLogicalType', this > will casue the field type mismatch. It should be fixed like : > {code:java} > val queryIndex = queryLogicalType.getFieldIndex(name) > val nestedLogicalType = > queryLogicalType.getFields()(queryIndex).getType.asInstanceOf[RowType]{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)