You can still use the .sqlQuery(...) to create a common table there, then converts the table into a DataStream,
with this DataStream, you can add the multiple sink functions you like. izual <izual...@163.com> 于2020年11月2日周一 下午5:18写道: > Hi, Danny: > > > Thanks for your help. > > > As in the question, some result was saved using DataStream API: > > > ``` > > table.toAppendStream[Row].addSink(new MyStreamSink) > > > class MyStreamSink extends RichSinkFunction[Row] { > > override def invoke(r: Row): Unit = { > > // save result > > } > > } > > ``` > > > So if use `StatementSet.addInsert`, should I must use > UpsertStreamTableSink and StreamTableSourceFactory to wrap the > RichSinkFunction? > > Is there a way to keep using DataStream API in table environment? which is > more expressive. > > > > > > > At 2020-11-02 16:53:22, "Danny Chan" <danny0...@apache.org> wrote: > > You can still convert the datastream to table and register it with method > > void TableEnvironment.createTemporaryView(String path, Table view) > > Then create a StatementSet with > > StatementSet TableEnvironment.createStatementSet(), > > With the StatementSet, you can execute multiple insert statements > altogether, > and then submit the job with > > TableResult StatementSet.execute() > > izual <izual...@163.com> 于2020年11月2日周一 上午11:28写道: > >> Hi, community: >> >> >> We used flink 1.9.1, both SQL and DataStream API to support multiple >> sinks for product envs. >> >> For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and >> table.toAppendStream[Row].addSink(new RichSinkFunction[Row] >> {...}).name("dest2"), and env.execute() to submit the DAG together, and >> result will sink to dest1 or dest2 or both. >> >> >> Now I try to update flink to 1.11.2, according to [1]-Attention,use >> tableEnv.execute() instead of env.execute(), but only get the result of >> `sqlUpdate`, and result of `DataStream.addSink` is missed. >> >> >> 1. How to get both the results in mixed SQL/DataStream use cases, maybe >> change all RichSinkFunction into a UpsertTable works. Is there another >> simple way to do this? >> >> 2. It seems like env.getExecutionPlan only returns DAG of DataStream >> API now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1. >> >> >> Thanks for ur reply. >> >> >> >> > > > >