Hello, I'm getting an error in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka.
The behavior I want is the following: *input:* | client_number | address | | ------------------- | ----------- | | 1 | addr1 | | 1 | addr1 | | 1 | addr2 | | 1 | addr2 | | 1 | addr1 | | 1 | addr1 | *output:* | client_number | address | | ------------------- | ----------- | | 1 | addr1 | | 1 | addr2 | | 1 | addr1 | The error seems to say that the type of stream created by the deduplication query is of "update & delete" type, while kafka only supports append-only: Unsupported query Table sink 'vvp.default.sat_customers_address' doesn't support consuming update and delete changes which is produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) --> Is there a way to create an append only query from this kind of deduplication query (see my code here below)? --> Would that work if I would use, say, a Postgres sink? Bonus question: can we extract the Kafka ingestion date using Flink SQL? (here I generated a processing date to allow ordering during deduplication) P.S.: I'm on the Ververica Platform, but I guess this error is linked to Flink SQL itself. Thanks in advance for your help. Best Regards, Laurent. ----------------------------------- -- Read from customers kafka topic ----------------------------------- CREATE TEMPORARY TABLE customers ( `client_number` INT, `name` VARCHAR(100), `address` VARCHAR(100) ) COMMENT '' WITH ( 'connector' = 'kafka', 'format' = 'csv', 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092', 'properties.group.id' = 'flinkSQL', 'topic' = 'customers', 'csv.field-delimiter' = ';', 'scan.startup.mode' = 'earliest-offset' ); ----------------------------------- -- Add metadata ----------------------------------- CREATE TEMPORARY VIEW metadata AS SELECT * , sha256(cast(client_number as STRING)) AS customer_pk , current_timestamp AS load_date , 'Kafka topic: customers' AS record_source FROM customers; ----------------------------------- -- Deduplicate addresses ----------------------------------- CREATE TEMPORARY VIEW dedup_address as SELECT customer_pk , client_number , load_date , address FROM ( SELECT customer_pk , client_number , load_date , record_source , address , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address ORDER BY load_date ASC) AS rownum FROM metadata ) where rownum = 1; ----------------------------------- -- Send to sat_customers_address kafka topic ----------------------------------- CREATE TEMPORARY TABLE sat_customers_address ( `customer_pk` VARCHAR(64), `client_number` INT, `address` VARCHAR(100) ) COMMENT '' WITH ( 'connector' = 'kafka', 'format' = 'csv', 'properties.bootstrap.servers' = 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', 'properties.group.id' = 'flinkSQL', 'topic' = 'sat_customers_address' ); INSERT INTO sat_customers_address SELECT customer_pk , client_number , address FROM dedup_address; -- *Laurent Exsteens* Data Engineer (M) +32 (0) 486 20 48 36 *EURA NOVA* Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) +32 10 75 02 00 *euranova.eu <http://euranova.eu/>* *research.euranova.eu* <http://research.euranova.eu/> -- ♻ Be green, keep it on the screen