So when t=C2 arrives, the source connector must send a `DELETE` message about that the row C should be deleted to downstream, and send a new 'INSERT' message to notify downstream that a new row D should be insert into the sink. This source connector is just like a CDC source but it seems that you need to costomize it yourself. The `DELETE` message about row C is a RowData which RowKind is `DELETE`. When sink receive this DELETE message, it will notify the DB to delete this data, by either pk or the whole row if non-pk.
-- Best! Xuyang 在 2022-06-14 19:45:06,"John Tipper" <john_tip...@hotmail.com> 写道: Yes, I’m interested in the best pattern to follow with SQL to allow for a downstream DB using the JDBC SQL connector to reflect the state of rows added and deleted upstream. So imagine there is a crawl event at t=C1 that happens with an associated timestamp and which finds resources A,B,C. Is it better to emit one event into the stream with an array of all resources or many events, each with one resource and a corresponding crawl timestamp. There is obviously a limit to the amount of data that can be in a single event so the latter pattern will scale better for many resources. Flink SQL sees this stream and processes it, then emits to a JDBC sink where there is one row for A, B, C. Later, at t=C2, another crawl happens, finding A, B, D. I want the sink DB to have 3 rows if possible and not have C. Alternatively it should have 4 rows with a tombstone/delete marker on row C so it’s obvious it doesn’t exist any more. I’m interested in a SQL solution if possible. J Sent from my iPhone On 9 Jun 2022, at 11:20, Xuyang <xyzhong...@163.com> wrote: Hi, Dian Fu. I think John's requirement is like a cdc source that the source needs the ability to know which of datas should be deleted and then notify the framework, and that is why I recommendation John to use the UDTF. And hi, John. I'm not sure this doc [1] is enough. BTW, I think you can also try to customize a connector[2] to send `DELETE` RowData to downstream by java and use it in PyFlink SQL, and maybe it's more easy. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/udfs/python_udfs/#table-functions [2] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/#user-defined-sources--sinks -- Best! Xuyang 在 2022-06-09 08:53:36,"Dian Fu" <dian0511...@gmail.com> 写道: Hi John, If you are using Table API & SQL, the framework is handling the RowKind and it's transparent for you. So usually you don't need to handle RowKind in Table API & SQL. Regards, Dian On Thu, Jun 9, 2022 at 6:56 AM John Tipper <john_tip...@hotmail.com> wrote: Hi Xuyang, Thank you very much, I’ll experiment tomorrow. Do you happen to know whether there is a Python example of udtf() with a RowKind being set (or whether it’s supported)? Many thanks, John Sent from my iPhone On 8 Jun 2022, at 16:41, Xuyang <xyzhong...@163.com> wrote: Hi, John. What about use udtf [1]? In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, the new resources as s2 will be collected by crawl. I think what you want is the deletion data that means 's1' - 's2'. So just use loop to find out the deletion data and send RowData in function 'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 'DELETE' means tell the downstream that this value is deleted. I will be glad if it can help you. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions [2] https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52 -- Best! Xuyang At 2022-06-08 20:06:17, "John Tipper" <john_tip...@hotmail.com> wrote: Hi all, I have some reference data that is periodically emitted by a crawler mechanism into an upstream Kinesis data stream, where those rows are used to populate a sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data Analytics). What is the best pattern to handle deletion of upstream data, such that the downstream table remains in sync with upstream? For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting in a DB with 3 rows. At some point between t=1 and t=2, the resource corresponding to R2 was deleted, such that at t=2 when the next crawl was carried out only rows R1 and R2 were emitted into the upstream stream. How should I process the stream of events so that when I have finished processing the events from t=2 my downstream table also has just rows R1 and R3? Many thanks, John