I see what was my mistake: I was using a field in my ORDER BY, while it only support proctime() for now.
That allows me to create an append only stream, thanks a lot! However, it still does not allow me to do what I need: *If I use both my primary key and changing column in PARTITION BY, then it does not allow me to come back to a previous value of my changed column:* SELECT client_number , address , proctime() as load_date FROM ( SELECT client_number , address , ROW_NUMBER() OVER (PARTITION BY *client_number, **address* ORDER BY proctime() ASC) AS rownum FROM customers ) where rownum = 1; *input:* *output*: | client_number | address | load_date | | client_number | address | load_date | | ------------------- | ----------- | -------------- | | ------------------- | ----------- | -------------- | | 1 | addr1 | ts1 | --> | 1 | addr1 | ts1 | | 1 | addr1 | ts2 | | 1 | addr2 | ts3 | --> | 1 | addr2 | ts3 | | 1 | addr2 | ts4 | | 1 | addr1 | ts5 | *--> | 1 | addr1 | ts5 | <-- this one does not show --> I cannot change back to a previous value :(* | 1 | addr1 | ts6 | *If however I only put my primary key in PARTITION BY, then I only get the first value of my changed column:* SELECT client_number , address , proctime() as load_date FROM ( SELECT client_number , address , ROW_NUMBER() OVER (PARTITION BY *client_number* ORDER BY proctime() ASC) AS rownum FROM customers ) where rownum = 1; *input:* *output*: | client_number | address | load_date | | client_number | address | load_date | | ------------------- | ----------- | -------------- | | ------------------- | ----------- | -------------- | | 1 | addr1 | ts1 | --> | 1 | addr1 | ts1 | | 1 | addr1 | ts2 | | 1 | addr2 | ts3 | * --> | 1 | addr2 | ts3 | <-- this one does not show :(* | 1 | addr2 | ts4 | | 1 | addr1 | ts5 | *--> | 1 | addr1 | ts5 | <-- this one does not show :(* | 1 | addr1 | ts6 | Basically, I need to deduplicate, *but only keeping in the deduplication state the latest value of the changed column* to compare with. While here it seems to keep all previous values... Is there a way to obtain the behavior I need (with this deduplication method or another one)? Thanks in advance. Best Regards, Laurent. On Thu, 12 Nov 2020 at 12:48, Jark Wu <imj...@gmail.com> wrote: > Hi Laurent, > > 1. Currently, it's impossible to convert deduplicate with last row into an > append-only stream. > 2. Yes, I think Ververica platform doesn't support 'changelog-json' format > natively. > > However, regarding your case, I think you can use keep first row on > client_number+address key. > > SELECT * > FROM ( > SELECT client_number, address, load_date > ROW_NUMBER() OVER > (PARTITION BY client_number, address ORDER BY proctime() ASC) AS > rownum > FROM src) > WHERE rownum = 1 > > That means, the duplicate records on the same client_number + address will > be ignored, > but the new value of address will be emitted as an append-only stream. > > Hope this helps you. > > Best, > Jark > > > On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens < > laurent.exste...@euranova.eu> wrote: > >> Hi Jark, >> >> thanks again for your quick response! >> >> I tried multiple variants of my query by: >> - specifying only the primary key in the PARTITION BY clause >> - changing the order to DESC to keep the last row >> >> --> I unfortunately always get the same error message. >> If I try to make a simple select on the result of this query, I also get >> the following error: The submitted query is not an append-only query. >> Only queries producing exclusively new rows over time are supported at the >> moment. So whatever change I make, I never get an append-only query --> >> Is there something I missed? >> >> I also tried to write to kafka as changelog-json, but I got the answer: The >> sink connector for table `vvp`.`default`.`sat_customers_address` could not >> be created. 'changelog-json' is not a supported sink format. Supported sink >> formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet']. >> (maybe because I'm on the Ververica platform?) >> This also seem to require an extra kafka topic then, so not ideal. >> >> >> *I'm starting to wonder if the deduplication query is really what I need.* >> >> What I need is: >> - to forward only the records where some columns (ideally configurable) >> change for a specific primary key. >> - in realtime (no windowing) >> - and have as a result an append-only stream. >> >> Like this: >> >> *input:* >> *output* (this is what should ultimatelly be >> published to Kafka and later inserted in a RDBMS): >> | client_number | address | load_date | | >> client_number | address | load_date | >> | ------------------- | ----------- | -------------- | | >> ------------------- | ----------- | -------------- | >> | 1 | addr1 | ts1 | --> | >> 1 | addr1 | ts1 | >> | 1 | addr1 | ts2 | >> | 1 | addr2 | ts3 | --> >> | 1 | addr2 | ts3 | >> | 1 | addr2 | ts4 | >> | 1 | addr1 | ts5 | --> | >> 1 | addr1 | ts5 | >> | 1 | addr1 | ts6 | >> >> >> --> is this deduplication query the right fit therefore? >> - if yes, how should it be written to generate an append-only stream? >> - If not, are there other options? (Match Recognize, UDF, ....?) >> >> Thanks a lot for your much appreciated help :). >> >> Best Regards, >> >> Laurent. >> >> >> On Thu, 12 Nov 2020 at 07:26, Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Laurent, >>> >>> > What I want is a record to be forwarded only if some of the columns >>> change >>> >>> IIUC, what you want is still deduplication with the last row. >>> Keeping first row will drop all the duplicate rows on the same primary >>> key. >>> Keeping last row will emit updates when the duplicate rows on the same >>> primary key, that means column value changes will notify downstream >>> operators. >>> The difference of keeping first row and last row is specified by the >>> direction of ORDER BY clause [1]. >>> >>> Best, >>> Jark >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>> >>> >>> >>> >>> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens < >>> laurent.exste...@euranova.eu> wrote: >>> >>>> Thanks. >>>> >>>> I actually want the first row. What I want is a record to be forwarded >>>> only if some of the columns change (of course keyed by the primary key). I >>>> used rownum = 1, is that not selecting the first row? >>>> >>>> How can I adapt my query to let only the row effectively changing the >>>> values pass, as an append only stream? >>>> >>>> If not possible, I'll look at converting it after. But I prefer a >>>> solution in the deduplication query. >>>> The goal is to show my customer that what they want to achieve is very >>>> straightforward in flink SQL, so the simpler the queries the better. I need >>>> to present my conclusions tomorrow. >>>> >>>> Thanks a lot already for your help! >>>> >>>> Best regards, >>>> >>>> Laurent. >>>> >>>> >>>> On Thu, Nov 12, 2020, 03:43 Jark Wu <imj...@gmail.com> wrote: >>>> >>>>> Hi Laurent, >>>>> >>>>> 1. Deduplicate with keeping the first row will generate an append-only >>>>> stream. But I guess you are expecting to keep the last row which generates >>>>> an updating stream. An alternative way is you can >>>>> use the "changelog-json" format in this repo [1], it will convert the >>>>> updating stream into append >>>>> records with change flag encoded. >>>>> 2. Yes. It will replace records with the same key, i.e. upsert >>>>> statement. >>>>> 3. I think it will be available in one or two months. There will be a >>>>> first release candidate soon. >>>>> You can watch on the dev ML. I'm not sure the plan of Ververica >>>>> platform, cc @Konstantin Knauf <konstan...@ververica.com> >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> [1]: >>>>> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format >>>>> >>>>> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens < >>>>> laurent.exste...@euranova.eu> wrote: >>>>> >>>>>> Hi Jark, >>>>>> >>>>>> thanks for your quick reply. I was indeed expecting it. >>>>>> >>>>>> But that triggers the following questions: >>>>>> >>>>>> 1. Is there another way to do this deduplication and generate an >>>>>> append-only stream? Match Recognize? UDF? ...? >>>>>> 2. If I would put Postgres as a sink, what would happen? Will the >>>>>> events happen or will they replace the record with the same key? >>>>>> 3. When will release-1.12 be available? And when would it be >>>>>> integrated in the Ververica platform? >>>>>> >>>>>> Thanks a lot for your help! >>>>>> >>>>>> Best Regards, >>>>>> >>>>>> Laurent. >>>>>> >>>>>> >>>>>> >>>>>> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote: >>>>>> >>>>>>> Hi Laurent, >>>>>>> >>>>>>> This is because the deduplicate node generates an updating stream, >>>>>>> however Kafka currently only supports append-only stream. >>>>>>> This can be addressed in release-1.12, because we introduce a new >>>>>>> connector "upsert-kafka" which supports writing updating >>>>>>> streams into Kafka compacted topics. >>>>>>> >>>>>>> Does the "Kafka ingestion date" refer to "kafka message timestamp", >>>>>>> i.e. ConsumerRecord#timestamp()? >>>>>>> If yes, this is also supported in release-1.12 via metadata syntax >>>>>>> in DDL [1]: >>>>>>> >>>>>>> CREATE TABLE kafka_table ( >>>>>>> id BIGINT, >>>>>>> name STRING, >>>>>>> timestamp BIGINT METADATA, -- read timestamp >>>>>>> ) WITH ( >>>>>>> 'connector' = 'kafka', >>>>>>> 'topic' = 'test-topic', >>>>>>> 'format' = 'avro' >>>>>>> ) >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> [1]: >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors >>>>>>> >>>>>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens < >>>>>>> laurent.exste...@euranova.eu> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>> I'm getting an error in Flink SQL when reading from kafka, >>>>>>>> deduplicating records and sending them back to Kafka. >>>>>>>> >>>>>>>> The behavior I want is the following: >>>>>>>> >>>>>>>> *input:* >>>>>>>> | client_number | address | >>>>>>>> | ------------------- | ----------- | >>>>>>>> | 1 | addr1 | >>>>>>>> | 1 | addr1 | >>>>>>>> | 1 | addr2 | >>>>>>>> | 1 | addr2 | >>>>>>>> | 1 | addr1 | >>>>>>>> | 1 | addr1 | >>>>>>>> >>>>>>>> *output:* >>>>>>>> | client_number | address | >>>>>>>> | ------------------- | ----------- | >>>>>>>> | 1 | addr1 | >>>>>>>> | 1 | addr2 | >>>>>>>> | 1 | addr1 | >>>>>>>> >>>>>>>> The error seems to say that the type of stream created by the >>>>>>>> deduplication query is of "update & delete" type, while kafka only >>>>>>>> supports >>>>>>>> append-only: >>>>>>>> >>>>>>>> Unsupported query >>>>>>>> Table sink 'vvp.default.sat_customers_address' doesn't support >>>>>>>> consuming update and delete changes which is produced by node >>>>>>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], >>>>>>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, >>>>>>>> address, >>>>>>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) >>>>>>>> >>>>>>>> >>>>>>>> --> Is there a way to create an append only query from this kind of >>>>>>>> deduplication query (see my code here below)? >>>>>>>> --> Would that work if I would use, say, a Postgres sink? >>>>>>>> >>>>>>>> Bonus question: can we extract the Kafka ingestion date using Flink >>>>>>>> SQL? (here I generated a processing date to allow ordering during >>>>>>>> deduplication) >>>>>>>> >>>>>>>> P.S.: I'm on the Ververica Platform, but I guess this error is >>>>>>>> linked to Flink SQL itself. >>>>>>>> >>>>>>>> Thanks in advance for your help. >>>>>>>> >>>>>>>> Best Regards, >>>>>>>> >>>>>>>> Laurent. >>>>>>>> >>>>>>>> ----------------------------------- >>>>>>>> -- Read from customers kafka topic >>>>>>>> ----------------------------------- >>>>>>>> CREATE TEMPORARY TABLE customers ( >>>>>>>> `client_number` INT, >>>>>>>> `name` VARCHAR(100), >>>>>>>> `address` VARCHAR(100) >>>>>>>> ) >>>>>>>> COMMENT '' >>>>>>>> WITH ( >>>>>>>> 'connector' = 'kafka', >>>>>>>> 'format' = 'csv', >>>>>>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092' >>>>>>>> , >>>>>>>> 'properties.group.id' = 'flinkSQL', >>>>>>>> 'topic' = 'customers', >>>>>>>> 'csv.field-delimiter' = ';', >>>>>>>> 'scan.startup.mode' = 'earliest-offset' >>>>>>>> ); >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> ----------------------------------- >>>>>>>> -- Add metadata >>>>>>>> ----------------------------------- >>>>>>>> CREATE TEMPORARY VIEW metadata AS >>>>>>>> SELECT * >>>>>>>> , sha256(cast(client_number as STRING)) AS customer_pk >>>>>>>> , current_timestamp AS load_date >>>>>>>> , 'Kafka topic: customers' AS record_source >>>>>>>> FROM customers; >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> ----------------------------------- >>>>>>>> -- Deduplicate addresses >>>>>>>> ----------------------------------- >>>>>>>> CREATE TEMPORARY VIEW dedup_address as >>>>>>>> SELECT customer_pk >>>>>>>> , client_number >>>>>>>> , load_date >>>>>>>> , address >>>>>>>> FROM ( >>>>>>>> SELECT customer_pk >>>>>>>> , client_number >>>>>>>> , load_date >>>>>>>> , record_source >>>>>>>> , address >>>>>>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, >>>>>>>> address ORDER BY load_date ASC) AS rownum >>>>>>>> FROM metadata >>>>>>>> ) where rownum = 1; >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> ----------------------------------- >>>>>>>> -- Send to sat_customers_address kafka topic >>>>>>>> ----------------------------------- >>>>>>>> CREATE TEMPORARY TABLE sat_customers_address ( >>>>>>>> `customer_pk` VARCHAR(64), >>>>>>>> `client_number` INT, >>>>>>>> `address` VARCHAR(100) >>>>>>>> ) >>>>>>>> COMMENT '' >>>>>>>> WITH ( >>>>>>>> 'connector' = 'kafka', >>>>>>>> 'format' = 'csv', >>>>>>>> 'properties.bootstrap.servers' = >>>>>>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', >>>>>>>> 'properties.group.id' = 'flinkSQL', >>>>>>>> 'topic' = 'sat_customers_address' >>>>>>>> ); >>>>>>>> >>>>>>>> INSERT INTO sat_customers_address >>>>>>>> SELECT customer_pk >>>>>>>> , client_number >>>>>>>> , address >>>>>>>> FROM dedup_address; >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> *Laurent Exsteens* >>>>>>>> Data Engineer >>>>>>>> (M) +32 (0) 486 20 48 36 >>>>>>>> >>>>>>>> *EURA NOVA* >>>>>>>> >>>>>>>> Rue Emile Francqui, 4 >>>>>>>> >>>>>>>> 1435 Mont-Saint-Guibert >>>>>>>> >>>>>>>> (T) +32 10 75 02 00 >>>>>>>> >>>>>>>> *euranova.eu <http://euranova.eu/>* >>>>>>>> >>>>>>>> *research.euranova.eu* <http://research.euranova.eu/> >>>>>>>> >>>>>>>> ♻ Be green, keep it on the screen >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> *Laurent Exsteens* >>>>>> Data Engineer >>>>>> (M) +32 (0) 486 20 48 36 >>>>>> >>>>>> *EURA NOVA* >>>>>> >>>>>> Rue Emile Francqui, 4 >>>>>> >>>>>> 1435 Mont-Saint-Guibert >>>>>> >>>>>> (T) +32 10 75 02 00 >>>>>> >>>>>> *euranova.eu <http://euranova.eu/>* >>>>>> >>>>>> *research.euranova.eu* <http://research.euranova.eu/> >>>>>> >>>>>> ♻ Be green, keep it on the screen >>>>> >>>>> >>>> ♻ Be green, keep it on the screen >>> >>> >> >> -- >> *Laurent Exsteens* >> Data Engineer >> (M) +32 (0) 486 20 48 36 >> >> *EURA NOVA* >> >> Rue Emile Francqui, 4 >> >> 1435 Mont-Saint-Guibert >> >> (T) +32 10 75 02 00 >> >> *euranova.eu <http://euranova.eu/>* >> >> *research.euranova.eu* <http://research.euranova.eu/> >> >> ♻ Be green, keep it on the screen > > -- *Laurent Exsteens* Data Engineer (M) +32 (0) 486 20 48 36 *EURA NOVA* Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) +32 10 75 02 00 *euranova.eu <http://euranova.eu/>* *research.euranova.eu* <http://research.euranova.eu/> -- ♻ Be green, keep it on the screen