Hi Laurent, This is because the deduplicate node generates an updating stream, however Kafka currently only supports append-only stream. This can be addressed in release-1.12, because we introduce a new connector "upsert-kafka" which supports writing updating streams into Kafka compacted topics.
Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. ConsumerRecord#timestamp()? If yes, this is also supported in release-1.12 via metadata syntax in DDL [1]: CREATE TABLE kafka_table ( id BIGINT, name STRING, timestamp BIGINT METADATA, -- read timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'test-topic', 'format' = 'avro' ) Best, Jark [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <laurent.exste...@euranova.eu> wrote: > Hello, > > I'm getting an error in Flink SQL when reading from kafka, deduplicating > records and sending them back to Kafka. > > The behavior I want is the following: > > *input:* > | client_number | address | > | ------------------- | ----------- | > | 1 | addr1 | > | 1 | addr1 | > | 1 | addr2 | > | 1 | addr2 | > | 1 | addr1 | > | 1 | addr1 | > > *output:* > | client_number | address | > | ------------------- | ----------- | > | 1 | addr1 | > | 1 | addr2 | > | 1 | addr1 | > > The error seems to say that the type of stream created by the > deduplication query is of "update & delete" type, while kafka only supports > append-only: > > Unsupported query > Table sink 'vvp.default.sat_customers_address' doesn't support consuming > update and delete changes which is produced by node > Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, > $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) > > > --> Is there a way to create an append only query from this kind of > deduplication query (see my code here below)? > --> Would that work if I would use, say, a Postgres sink? > > Bonus question: can we extract the Kafka ingestion date using Flink SQL? > (here I generated a processing date to allow ordering during deduplication) > > P.S.: I'm on the Ververica Platform, but I guess this error is linked to > Flink SQL itself. > > Thanks in advance for your help. > > Best Regards, > > Laurent. > > ----------------------------------- > -- Read from customers kafka topic > ----------------------------------- > CREATE TEMPORARY TABLE customers ( > `client_number` INT, > `name` VARCHAR(100), > `address` VARCHAR(100) > ) > COMMENT '' > WITH ( > 'connector' = 'kafka', > 'format' = 'csv', > 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092', > 'properties.group.id' = 'flinkSQL', > 'topic' = 'customers', > 'csv.field-delimiter' = ';', > 'scan.startup.mode' = 'earliest-offset' > ); > > > > ----------------------------------- > -- Add metadata > ----------------------------------- > CREATE TEMPORARY VIEW metadata AS > SELECT * > , sha256(cast(client_number as STRING)) AS customer_pk > , current_timestamp AS load_date > , 'Kafka topic: customers' AS record_source > FROM customers; > > > > ----------------------------------- > -- Deduplicate addresses > ----------------------------------- > CREATE TEMPORARY VIEW dedup_address as > SELECT customer_pk > , client_number > , load_date > , address > FROM ( > SELECT customer_pk > , client_number > , load_date > , record_source > , address > , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address > ORDER BY load_date ASC) AS rownum > FROM metadata > ) where rownum = 1; > > > > > > > ----------------------------------- > -- Send to sat_customers_address kafka topic > ----------------------------------- > CREATE TEMPORARY TABLE sat_customers_address ( > `customer_pk` VARCHAR(64), > `client_number` INT, > `address` VARCHAR(100) > ) > COMMENT '' > WITH ( > 'connector' = 'kafka', > 'format' = 'csv', > 'properties.bootstrap.servers' = > 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', > 'properties.group.id' = 'flinkSQL', > 'topic' = 'sat_customers_address' > ); > > INSERT INTO sat_customers_address > SELECT customer_pk > , client_number > , address > FROM dedup_address; > > > > > -- > *Laurent Exsteens* > Data Engineer > (M) +32 (0) 486 20 48 36 > > *EURA NOVA* > > Rue Emile Francqui, 4 > > 1435 Mont-Saint-Guibert > > (T) +32 10 75 02 00 > > *euranova.eu <http://euranova.eu/>* > > *research.euranova.eu* <http://research.euranova.eu/> > > ♻ Be green, keep it on the screen