Hi,

Maybe you have some misunderstanding to upsert sink. You can take a look to
[1], it can deal with "delete" records.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

Best,
Jingsong Lee

On Wed, Mar 25, 2020 at 11:37 AM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi,
>
> This can be a upsert stream [1], and JDBC has upsert sink now [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
>
> Best,
> Jingsong Lee
>
> On Wed, Mar 25, 2020 at 11:14 AM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
>> Hi,
>>
>> This can be a upsert stream [1]
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Mar 25, 2020 at 11:12 AM wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>>>
>>> Create one table with kafka,  another table with MySQL  using flinksql.
>>> Write a sql to read from kafka and write to MySQL.
>>>
>>> INSERT INTO mysqlTable  SELECT status, COUNT(order_no) AS num FROM
>>>                         (SELECT order_no, LAST_VALUE(status) AS status FROM 
>>> kafkaTable GROUP BY order_no)
>>>                         GROUP BY status
>>>
>>> I think this is a retract stream.
>>>
>>> But where can i find the java source code  about MySQL retract table sink?
>>>
>>>
>>> Thanks,
>>>
>>> Lei
>>>
>>>
>>> ------------------------------
>>> wangl...@geekplus.com.cn
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Reply via email to