Hi haishui, The enum type cannot be mapped as flink table type directly.
I think the easiest way is to convert enum to string type first: DataStreamSource<Tuple2<String, String>> source = env.fromElements( new Tuple2<>("1", TestEnum.A.name()), new Tuple2<>("2", TestEnum.B.name()) ); Or add a map transformation: DataStream<Tuple2<String, String>> source1 = env.fromElements( new TestData("1", TestEnum.A), new TestData("2", TestEnum.B)) .map(t -> new Tuple2<>(t.s, t.en.name())) .returns(new TypeHint<Tuple2<String, String>>() {}); Hope it helps. Best, Jiabao On 2023/08/02 06:43:30 haishui wrote: > I want to convert dataStream to Table. The type of dataSream is a POJO, which > contains a enum field. > > > 1. The enum field is RAW('classname', '...') in table. When I execute `SELECT > * FROM t_test` and print the result, It throws EOFException. > 2. If I assign the field is STRING in schema, It throws cannot cast > "TestEnum" to "java.lang.String" > > > Is there any way to define the enum field as STRING in table? > > > My code is as follows: > Flink 1.17.1 > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > DataStreamSource<TestData> source = env.fromElements( > new TestData("1", TestEnum.A), > new TestData("2", TestEnum.B) > ); > Schema schema = Schema > .newBuilder() > .column("s", DataTypes.STRING()) > .column("en", DataTypes.STRING()) > .build(); > Table table = tableEnv.fromDataStream(source); > tableEnv.createTemporaryView("t_test", table); > tableEnv.executeSql("DESC t_test").print(); > tableEnv.executeSql("select * from t_test").print(); > @Data > @NoArgsConstructor > @AllArgsConstructor > public static class TestData { > private String s; > private TestEnum en; > } > > public enum TestEnum { > A, B, C > } > +----+--------------------------------+--------------------------------+ > | op | s | en | > +----+--------------------------------+--------------------------------+ > | +I | 1 | SqlRawValue{?} | > Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: > java.io.EOFException > at > org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:66) > at GeneratedCastExecutor$1.cast(Unknown Source) > at > org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.lambda$init$0(RowDataToStringConverterImpl.java:74) > at > org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.convert(RowDataToStringConverterImpl.java:87) > at > org.apache.flink.table.utils.print.TableauStyle.rowFieldsToString(TableauStyle.java:167) > at > org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:148) > at > org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153) > Caused by: java.io.EOFException > at java.base/java.io.DataInputStream.readFully(DataInputStream.java:202) > at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170) > at > org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:96) > at > org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:36) > at > org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:505) > at > org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64) > ... 7 more