Hi,
Here's an example that works for me:
"""
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*$*;
import java.util.List;
public class Stream2Table {
public static void main(String[] args) {
var streamingEnv =
StreamExecutionEnvironment.*getExecutionEnvironment*();
var tableEnv = StreamTableEnvironment.*create*(streamingEnv);
var userRows = streamingEnv.fromCollection(
List.*of*(
Row.*of*("user1", "[email protected]
<mailto:[email protected]>", "Alice"),
Row.*of*("user2", "[email protected]
<mailto:[email protected]>", "Bob")
),
new RowTypeInfo(Types.*STRING*, Types.*STRING*,
Types.*STRING*));
var table = tableEnv
.fromDataStream(userRows,
*$*("user_id"), *$*("handle"), *$*("name"));
table.execute().print();
}
}
"""
You can also dig here, you'll probably find better examples
https://github.com/apache/flink/tree/master/flink-examples/flink-examples-table
Cheers,
Svend
On Sun, 11 Apr 2021, at 3:35 PM, vtygoss wrote:
>
> Hi All,
>
> there is a scenario where I need to process OGG Log data in kafka using Flink
> Sql. I can convert the OGG Log Stream to DataStream<RowData> and each event
> has RowKind, but i have trouble converting DataStream<RowData> to a Table.
> For test, i tried StreamTableEnvironment#fromDataStream and
> createTemporaryView API, both TableSchema is
> ```
> root
> |-- f0: LEGACY('RAW', 'ANY<org.apache.flink.table.data.RowData>')
> ```
>
> i want to get the schema :
>
> ```
> root
> |— column1: Type,
> |— column2: Type,
> ….
> ```
>
>
> how to convert DataStream<RowData> with RowKind to Table?
>
>
> Thank you very much for your reply
>