You can still convert the datastream to table and register it with method

void TableEnvironment.createTemporaryView(String path, Table view)

Then create a StatementSet with

StatementSet TableEnvironment.createStatementSet(),

With the StatementSet, you can execute multiple insert statements
altogether,
and then submit the job with

TableResult StatementSet.execute()

izual <izual...@163.com> 于2020年11月2日周一 上午11:28写道:

> Hi, community:
>
>
>   We used flink 1.9.1, both SQL and DataStream API to support multiple
> sinks for product envs.
>
>   For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and
> table.toAppendStream[Row].addSink(new RichSinkFunction[Row]
> {...}).name("dest2"), and env.execute() to submit the DAG together, and
> result will sink to dest1 or dest2 or both.
>
>
>   Now I try to update flink to 1.11.2, according to [1]-Attention,use
> tableEnv.execute() instead of env.execute(), but only get the result of
> `sqlUpdate`, and result of `DataStream.addSink` is missed.
>
>
>   1. How to get both the results in mixed SQL/DataStream use cases, maybe
> change all RichSinkFunction into a UpsertTable works. Is there another
> simple way to do this?
>
>   2. It seems like env.getExecutionPlan only returns DAG of DataStream API
> now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.
>
>
>   Thanks for ur reply.
>
>
>
>

Reply via email to