Hi Laurent, 1. Deduplicate with keeping the first row will generate an append-only stream. But I guess you are expecting to keep the last row which generates an updating stream. An alternative way is you can use the "changelog-json" format in this repo [1], it will convert the updating stream into append records with change flag encoded. 2. Yes. It will replace records with the same key, i.e. upsert statement. 3. I think it will be available in one or two months. There will be a first release candidate soon. You can watch on the dev ML. I'm not sure the plan of Ververica platform, cc @Konstantin Knauf <konstan...@ververica.com>
Best, Jark [1]: https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <laurent.exste...@euranova.eu> wrote: > Hi Jark, > > thanks for your quick reply. I was indeed expecting it. > > But that triggers the following questions: > > 1. Is there another way to do this deduplication and generate an > append-only stream? Match Recognize? UDF? ...? > 2. If I would put Postgres as a sink, what would happen? Will the > events happen or will they replace the record with the same key? > 3. When will release-1.12 be available? And when would it be > integrated in the Ververica platform? > > Thanks a lot for your help! > > Best Regards, > > Laurent. > > > > On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote: > >> Hi Laurent, >> >> This is because the deduplicate node generates an updating stream, >> however Kafka currently only supports append-only stream. >> This can be addressed in release-1.12, because we introduce a new >> connector "upsert-kafka" which supports writing updating >> streams into Kafka compacted topics. >> >> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. >> ConsumerRecord#timestamp()? >> If yes, this is also supported in release-1.12 via metadata syntax in DDL >> [1]: >> >> CREATE TABLE kafka_table ( >> id BIGINT, >> name STRING, >> timestamp BIGINT METADATA, -- read timestamp >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'test-topic', >> 'format' = 'avro' >> ) >> >> Best, >> Jark >> >> [1]: >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors >> >> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens < >> laurent.exste...@euranova.eu> wrote: >> >>> Hello, >>> >>> I'm getting an error in Flink SQL when reading from kafka, >>> deduplicating records and sending them back to Kafka. >>> >>> The behavior I want is the following: >>> >>> *input:* >>> | client_number | address | >>> | ------------------- | ----------- | >>> | 1 | addr1 | >>> | 1 | addr1 | >>> | 1 | addr2 | >>> | 1 | addr2 | >>> | 1 | addr1 | >>> | 1 | addr1 | >>> >>> *output:* >>> | client_number | address | >>> | ------------------- | ----------- | >>> | 1 | addr1 | >>> | 1 | addr2 | >>> | 1 | addr1 | >>> >>> The error seems to say that the type of stream created by the >>> deduplication query is of "update & delete" type, while kafka only supports >>> append-only: >>> >>> Unsupported query >>> Table sink 'vvp.default.sat_customers_address' doesn't support consuming >>> update and delete changes which is produced by node >>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], >>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, >>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) >>> >>> >>> --> Is there a way to create an append only query from this kind of >>> deduplication query (see my code here below)? >>> --> Would that work if I would use, say, a Postgres sink? >>> >>> Bonus question: can we extract the Kafka ingestion date using Flink SQL? >>> (here I generated a processing date to allow ordering during deduplication) >>> >>> P.S.: I'm on the Ververica Platform, but I guess this error is linked to >>> Flink SQL itself. >>> >>> Thanks in advance for your help. >>> >>> Best Regards, >>> >>> Laurent. >>> >>> ----------------------------------- >>> -- Read from customers kafka topic >>> ----------------------------------- >>> CREATE TEMPORARY TABLE customers ( >>> `client_number` INT, >>> `name` VARCHAR(100), >>> `address` VARCHAR(100) >>> ) >>> COMMENT '' >>> WITH ( >>> 'connector' = 'kafka', >>> 'format' = 'csv', >>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092', >>> 'properties.group.id' = 'flinkSQL', >>> 'topic' = 'customers', >>> 'csv.field-delimiter' = ';', >>> 'scan.startup.mode' = 'earliest-offset' >>> ); >>> >>> >>> >>> ----------------------------------- >>> -- Add metadata >>> ----------------------------------- >>> CREATE TEMPORARY VIEW metadata AS >>> SELECT * >>> , sha256(cast(client_number as STRING)) AS customer_pk >>> , current_timestamp AS load_date >>> , 'Kafka topic: customers' AS record_source >>> FROM customers; >>> >>> >>> >>> ----------------------------------- >>> -- Deduplicate addresses >>> ----------------------------------- >>> CREATE TEMPORARY VIEW dedup_address as >>> SELECT customer_pk >>> , client_number >>> , load_date >>> , address >>> FROM ( >>> SELECT customer_pk >>> , client_number >>> , load_date >>> , record_source >>> , address >>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address >>> ORDER BY load_date ASC) AS rownum >>> FROM metadata >>> ) where rownum = 1; >>> >>> >>> >>> >>> >>> >>> ----------------------------------- >>> -- Send to sat_customers_address kafka topic >>> ----------------------------------- >>> CREATE TEMPORARY TABLE sat_customers_address ( >>> `customer_pk` VARCHAR(64), >>> `client_number` INT, >>> `address` VARCHAR(100) >>> ) >>> COMMENT '' >>> WITH ( >>> 'connector' = 'kafka', >>> 'format' = 'csv', >>> 'properties.bootstrap.servers' = >>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', >>> 'properties.group.id' = 'flinkSQL', >>> 'topic' = 'sat_customers_address' >>> ); >>> >>> INSERT INTO sat_customers_address >>> SELECT customer_pk >>> , client_number >>> , address >>> FROM dedup_address; >>> >>> >>> >>> >>> -- >>> *Laurent Exsteens* >>> Data Engineer >>> (M) +32 (0) 486 20 48 36 >>> >>> *EURA NOVA* >>> >>> Rue Emile Francqui, 4 >>> >>> 1435 Mont-Saint-Guibert >>> >>> (T) +32 10 75 02 00 >>> >>> *euranova.eu <http://euranova.eu/>* >>> >>> *research.euranova.eu* <http://research.euranova.eu/> >>> >>> ♻ Be green, keep it on the screen >> >> > > -- > *Laurent Exsteens* > Data Engineer > (M) +32 (0) 486 20 48 36 > > *EURA NOVA* > > Rue Emile Francqui, 4 > > 1435 Mont-Saint-Guibert > > (T) +32 10 75 02 00 > > *euranova.eu <http://euranova.eu/>* > > *research.euranova.eu* <http://research.euranova.eu/> > > ♻ Be green, keep it on the screen