You can still use the .sqlQuery(...) to create a common table there, then
converts the table into a DataStream,

with this DataStream, you can add the multiple sink functions you like.

izual <izual...@163.com> 于2020年11月2日周一 下午5:18写道:

> Hi, Danny:
>
>
> Thanks for your help.
>
>
> As in the question, some result was saved using DataStream API:
>
>
> ```
>
> table.toAppendStream[Row].addSink(new MyStreamSink)
>
>
> class MyStreamSink extends RichSinkFunction[Row] {
>
> override def invoke(r: Row): Unit = {
>
> // save result
>
> }
>
> }
>
> ```
>
>
> So if use `StatementSet.addInsert`, should I must use
> UpsertStreamTableSink and StreamTableSourceFactory to wrap the
> RichSinkFunction?
>
> Is there a way to keep using DataStream API in table environment? which is
> more expressive.
>
>
>
>
>
>
> At 2020-11-02 16:53:22, "Danny Chan" <danny0...@apache.org> wrote:
>
> You can still convert the datastream to table and register it with method
>
> void TableEnvironment.createTemporaryView(String path, Table view)
>
> Then create a StatementSet with
>
> StatementSet TableEnvironment.createStatementSet(),
>
> With the StatementSet, you can execute multiple insert statements
> altogether,
> and then submit the job with
>
> TableResult StatementSet.execute()
>
> izual <izual...@163.com> 于2020年11月2日周一 上午11:28写道:
>
>> Hi, community:
>>
>>
>>   We used flink 1.9.1, both SQL and DataStream API to support multiple
>> sinks for product envs.
>>
>>   For example, tableEnv.sqlUpdate("INSERT INTO dest1 SELECT ...") and
>> table.toAppendStream[Row].addSink(new RichSinkFunction[Row]
>> {...}).name("dest2"), and env.execute() to submit the DAG together, and
>> result will sink to dest1 or dest2 or both.
>>
>>
>>   Now I try to update flink to 1.11.2, according to [1]-Attention,use
>> tableEnv.execute() instead of env.execute(), but only get the result of
>> `sqlUpdate`, and result of `DataStream.addSink` is missed.
>>
>>
>>   1. How to get both the results in mixed SQL/DataStream use cases, maybe
>> change all RichSinkFunction into a UpsertTable works. Is there another
>> simple way to do this?
>>
>>   2. It seems like env.getExecutionPlan only returns DAG of DataStream
>> API now, so how to get the whole DAG like env.getExecutionPlan() in 1.9.1.
>>
>>
>>   Thanks for ur reply.
>>
>>
>>
>>
>
>
>
>

Reply via email to