You can still convert the datastream to table and register it with method void TableEnvironment.createTemporaryView(String path, Table view)
Then create a StatementSet with StatementSet TableEnvironment.createStatementSet(), With the StatementSet, you can execute multiple insert statements altogether, and then submit the job with TableResult StatementSet.execute() izual <izual...@163.com> 于2020年11月2日周一 上午11:28写道: > Hi, community: > > > We used flink 1.9.1, both SQL and DataStream API to support multiple > sinks for product envs. > > For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and > table.toAppendStream[Row].addSink(new RichSinkFunction[Row] > {...}).name("dest2"), and env.execute() to submit the DAG together, and > result will sink to dest1 or dest2 or both. > > > Now I try to update flink to 1.11.2, according to [1]-Attention,use > tableEnv.execute() instead of env.execute(), but only get the result of > `sqlUpdate`, and result of `DataStream.addSink` is missed. > > > 1. How to get both the results in mixed SQL/DataStream use cases, maybe > change all RichSinkFunction into a UpsertTable works. Is there another > simple way to do this? > > 2. It seems like env.getExecutionPlan only returns DAG of DataStream API > now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1. > > > Thanks for ur reply. > > > >