Hi Jark, thanks for your quick reply. I was indeed expecting it.
But that triggers the following questions: 1. Is there another way to do this deduplication and generate an append-only stream? Match Recognize? UDF? ...? 2. If I would put Postgres as a sink, what would happen? Will the events happen or will they replace the record with the same key? 3. When will release-1.12 be available? And when would it be integrated in the Ververica platform? Thanks a lot for your help! Best Regards, Laurent. On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote: > Hi Laurent, > > This is because the deduplicate node generates an updating stream, however > Kafka currently only supports append-only stream. > This can be addressed in release-1.12, because we introduce a new > connector "upsert-kafka" which supports writing updating > streams into Kafka compacted topics. > > Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. > ConsumerRecord#timestamp()? > If yes, this is also supported in release-1.12 via metadata syntax in DDL > [1]: > > CREATE TABLE kafka_table ( > id BIGINT, > name STRING, > timestamp BIGINT METADATA, -- read timestamp > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test-topic', > 'format' = 'avro' > ) > > Best, > Jark > > [1]: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors > > On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens < > laurent.exste...@euranova.eu> wrote: > >> Hello, >> >> I'm getting an error in Flink SQL when reading from kafka, deduplicating >> records and sending them back to Kafka. >> >> The behavior I want is the following: >> >> *input:* >> | client_number | address | >> | ------------------- | ----------- | >> | 1 | addr1 | >> | 1 | addr1 | >> | 1 | addr2 | >> | 1 | addr2 | >> | 1 | addr1 | >> | 1 | addr1 | >> >> *output:* >> | client_number | address | >> | ------------------- | ----------- | >> | 1 | addr1 | >> | 1 | addr2 | >> | 1 | addr1 | >> >> The error seems to say that the type of stream created by the >> deduplication query is of "update & delete" type, while kafka only supports >> append-only: >> >> Unsupported query >> Table sink 'vvp.default.sat_customers_address' doesn't support consuming >> update and delete changes which is produced by node >> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], >> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, >> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) >> >> >> --> Is there a way to create an append only query from this kind of >> deduplication query (see my code here below)? >> --> Would that work if I would use, say, a Postgres sink? >> >> Bonus question: can we extract the Kafka ingestion date using Flink SQL? >> (here I generated a processing date to allow ordering during deduplication) >> >> P.S.: I'm on the Ververica Platform, but I guess this error is linked to >> Flink SQL itself. >> >> Thanks in advance for your help. >> >> Best Regards, >> >> Laurent. >> >> ----------------------------------- >> -- Read from customers kafka topic >> ----------------------------------- >> CREATE TEMPORARY TABLE customers ( >> `client_number` INT, >> `name` VARCHAR(100), >> `address` VARCHAR(100) >> ) >> COMMENT '' >> WITH ( >> 'connector' = 'kafka', >> 'format' = 'csv', >> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092', >> 'properties.group.id' = 'flinkSQL', >> 'topic' = 'customers', >> 'csv.field-delimiter' = ';', >> 'scan.startup.mode' = 'earliest-offset' >> ); >> >> >> >> ----------------------------------- >> -- Add metadata >> ----------------------------------- >> CREATE TEMPORARY VIEW metadata AS >> SELECT * >> , sha256(cast(client_number as STRING)) AS customer_pk >> , current_timestamp AS load_date >> , 'Kafka topic: customers' AS record_source >> FROM customers; >> >> >> >> ----------------------------------- >> -- Deduplicate addresses >> ----------------------------------- >> CREATE TEMPORARY VIEW dedup_address as >> SELECT customer_pk >> , client_number >> , load_date >> , address >> FROM ( >> SELECT customer_pk >> , client_number >> , load_date >> , record_source >> , address >> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address >> ORDER BY load_date ASC) AS rownum >> FROM metadata >> ) where rownum = 1; >> >> >> >> >> >> >> ----------------------------------- >> -- Send to sat_customers_address kafka topic >> ----------------------------------- >> CREATE TEMPORARY TABLE sat_customers_address ( >> `customer_pk` VARCHAR(64), >> `client_number` INT, >> `address` VARCHAR(100) >> ) >> COMMENT '' >> WITH ( >> 'connector' = 'kafka', >> 'format' = 'csv', >> 'properties.bootstrap.servers' = >> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', >> 'properties.group.id' = 'flinkSQL', >> 'topic' = 'sat_customers_address' >> ); >> >> INSERT INTO sat_customers_address >> SELECT customer_pk >> , client_number >> , address >> FROM dedup_address; >> >> >> >> >> -- >> *Laurent Exsteens* >> Data Engineer >> (M) +32 (0) 486 20 48 36 >> >> *EURA NOVA* >> >> Rue Emile Francqui, 4 >> >> 1435 Mont-Saint-Guibert >> >> (T) +32 10 75 02 00 >> >> *euranova.eu <http://euranova.eu/>* >> >> *research.euranova.eu* <http://research.euranova.eu/> >> >> ♻ Be green, keep it on the screen > > -- *Laurent Exsteens* Data Engineer (M) +32 (0) 486 20 48 36 *EURA NOVA* Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) +32 10 75 02 00 *euranova.eu <http://euranova.eu/>* *research.euranova.eu* <http://research.euranova.eu/> -- ♻ Be green, keep it on the screen