Hi community, I'm currently converting a DataStream of Avro SpecificRecord type into Table using the following method:
public static <T extends SpecificRecord> Table toTable(StreamTableEnvironment tEnv, DataStream<T> dataStream, Class<T> cls) { RichMapFunction<T, Row> avroSpecific2RowConverter = new RichMapFunction<>() { private transient AvroSerializationSchema<T> avro2bin = null; private transient AvroRowDeserializationSchema bin2row = null; @Override public void open(Configuration parameters) throws Exception { avro2bin = AvroSerializationSchema.forSpecific(cls); bin2row = new AvroRowDeserializationSchema(cls); } @Override public Row map(T value) throws Exception { byte[] bytes = avro2bin.serialize(value); Row row = bin2row.deserialize(bytes); return row; } }; SingleOutputStreamOperator<Row> rows = dataStream.map(avroSpecific2RowConverter) // https://issues.apache.org/jira/browse/FLINK-23885 .returns(AvroSchemaConverter.convertToTypeInfo(cls)); return tEnv.fromDataStream(rows); } I'm wondering whether there's a pre-defined utility for that or a better way to do so in Flink-1.14. Best, Dongwon