You can still use the .sqlQuery(...) to create a common table there, then
converts the table into a DataStream,

with this DataStream, you can add the multiple sink functions you like.

izual <> 于2020年11月2日周一 下午5:18写道:

> Hi, Danny:
> Thanks for your help.
> As in the question, some result was saved using DataStream API:
> ```
> table.toAppendStream[Row].addSink(new MyStreamSink)
> class MyStreamSink extends RichSinkFunction[Row] {
> override def invoke(r: Row): Unit = {
> // save result
> }
> }
> ```
> So if use `StatementSet.addInsert`, should I must use
> UpsertStreamTableSink and StreamTableSourceFactory to wrap the
> RichSinkFunction?
> Is there a way to keep using DataStream API in table environment? which is
> more expressive.
> At 2020-11-02 16:53:22, "Danny Chan" <> wrote:
> You can still convert the datastream to table and register it with method
> void TableEnvironment.createTemporaryView(String path, Table view)
> Then create a StatementSet with
> StatementSet TableEnvironment.createStatementSet(),
> With the StatementSet, you can execute multiple insert statements
> altogether,
> and then submit the job with
> TableResult StatementSet.execute()
> izual <> 于2020年11月2日周一 上午11:28写道:
>> Hi, community:
>>   We used flink 1.9.1, both SQL and DataStream API to support multiple
>> sinks for product envs.
>>   For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and
>> table.toAppendStream[Row].addSink(new RichSinkFunction[Row]
>> {...}).name("dest2"), and env.execute() to submit the DAG together, and
>> result will sink to dest1 or dest2 or both.
>>   Now I try to update flink to 1.11.2, according to [1]-Attention,use
>> tableEnv.execute() instead of env.execute(), but only get the result of
>> `sqlUpdate`, and result of `DataStream.addSink` is missed.
>>   1. How to get both the results in mixed SQL/DataStream use cases, maybe
>> change all RichSinkFunction into a UpsertTable works. Is there another
>> simple way to do this?
>>   2. It seems like env.getExecutionPlan only returns DAG of DataStream
>> API now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.
>>   Thanks for ur reply.

Reply via email to