Hi haishui,

The enum type cannot be mapped as flink table type directly.

I think the easiest way is to convert enum to string type first:

DataStreamSource<Tuple2<String, String>> source = env.fromElements(
        new Tuple2<>("1", TestEnum.A.name()),
        new Tuple2<>("2", TestEnum.B.name())
);

Or add a map transformation:

DataStream<Tuple2<String, String>> source1 =
        env.fromElements(
                new TestData("1", TestEnum.A),
                new TestData("2", TestEnum.B))
           .map(t -> new Tuple2<>(t.s, t.en.name()))
           .returns(new TypeHint<Tuple2<String, String>>() {});

Hope it helps.

Best,
Jiabao


On 2023/08/02 06:43:30 haishui wrote:
> I want to convert dataStream to Table. The type of dataSream is a POJO, which 
> contains a enum field.
> 
> 
> 1. The enum field is RAW('classname', '...') in table. When I execute `SELECT 
> * FROM t_test` and print the result, It throws EOFException.
> 2. If I assign the field is STRING in schema, It throws cannot cast 
> "TestEnum" to "java.lang.String"
> 
> 
> Is there any way to define the enum field as STRING in table?
> 
> 
> My code is as follows:
> Flink 1.17.1
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> DataStreamSource<TestData> source = env.fromElements(
> new TestData("1", TestEnum.A),
>                 new TestData("2", TestEnum.B)
>         );
> Schema schema = Schema
>              .newBuilder()
>              .column("s", DataTypes.STRING())
>              .column("en", DataTypes.STRING())
>              .build();
> Table table = tableEnv.fromDataStream(source);
> tableEnv.createTemporaryView("t_test", table);
> tableEnv.executeSql("DESC t_test").print();
> tableEnv.executeSql("select * from t_test").print();
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> public static class TestData {
> private String s;
>     private TestEnum en;
> }
> 
> public enum TestEnum {
> A, B, C
> }
> +----+--------------------------------+--------------------------------+
> | op |                              s |                             en |
> +----+--------------------------------+--------------------------------+
> | +I |                              1 |                 SqlRawValue{?} |
> Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: 
> java.io.EOFException
>       at 
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:66)
>       at GeneratedCastExecutor$1.cast(Unknown Source)
>       at 
> org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.lambda$init$0(RowDataToStringConverterImpl.java:74)
>       at 
> org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.convert(RowDataToStringConverterImpl.java:87)
>       at 
> org.apache.flink.table.utils.print.TableauStyle.rowFieldsToString(TableauStyle.java:167)
>       at 
> org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:148)
>       at 
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
> Caused by: java.io.EOFException
>       at java.base/java.io.DataInputStream.readFully(DataInputStream.java:202)
>       at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170)
>       at 
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:96)
>       at 
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:36)
>       at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:505)
>       at 
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>       ... 7 more

Reply via email to