Hi, Here's an example that works for me:
""" import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.*$*; import java.util.List; public class Stream2Table { public static void main(String[] args) { var streamingEnv = StreamExecutionEnvironment.*getExecutionEnvironment*(); var tableEnv = StreamTableEnvironment.*create*(streamingEnv); var userRows = streamingEnv.fromCollection( List.*of*( Row.*of*("user1", "al...@mail.org <mailto:us...@mail.org>", "Alice"), Row.*of*("user2", "b...@mail.org <mailto:us...@mail.org>", "Bob") ), new RowTypeInfo(Types.*STRING*, Types.*STRING*, Types.*STRING*)); var table = tableEnv .fromDataStream(userRows, *$*("user_id"), *$*("handle"), *$*("name")); table.execute().print(); } } """ You can also dig here, you'll probably find better examples https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table Cheers, Svend On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote: > > Hi All, > > there is a scenario where I need to process OGG Log data in kafka using Flink > Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event > has RowKind, but i have trouble converting DataStream<RowData> to a Table. > For test, i tried StreamTableEnvironment#fromDataStream and > createTemporaryView API, both TableSchema is > ``` > root > |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>') > ``` > > i want to get the schema : > > ``` > root > |— column1: Type, > |— column2: Type, > …. > ``` > > > how to convert DataStream<RowData> with RowKind to Table? > > > Thank you very much for your reply >