Re: Re: How to use both of SQL and DataStream in 1.11

2020-11-03 Thread Danny Chan
You can still use the .sqlQuery(...) to create a common table there, then converts the table into a DataStream, with this DataStream, you can add the multiple sink functions you like. izual 于2020年11月2日周一 下午5:18写道: > Hi, Danny: > > > Thanks for your help. > > > As in the question, some result wa

Re:Re: How to use both of SQL and DataStream in 1.11

2020-11-02 Thread izual
Hi, Danny: Thanks for your help. As in the question, some result was saved using DataStream API: ``` table.toAppendStream[Row].addSink(new MyStreamSink) class MyStreamSink extends RichSinkFunction[Row] { override def invoke(r: Row): Unit = { // save result } } ``` So if us

Re: How to use both of SQL and DataStream in 1.11

2020-11-02 Thread Danny Chan
You can still convert the datastream to table and register it with method void TableEnvironment.createTemporaryView(String path, Table view) Then create a StatementSet with StatementSet TableEnvironment.createStatementSet(), With the StatementSet, you can execute multiple insert statements alto

How to use both of SQL and DataStream in 1.11

2020-11-01 Thread izual
Hi, community: We used flink 1.9.1, both SQL and DataStream API to support multiple sinks for product envs. For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and table.toAppendStream[Row].addSink(new RichSinkFunction[Row] {...}).name("dest2"), and env.execute() to submit t