Hi Laurent, With respect to Ververica Platform, we will support Flink 1.12 and add "upsert-kafka" as a packaged connector in our next minor release which we target for February.
Cheers, Konstantin On Thu, Nov 12, 2020 at 3:43 AM Jark Wu <imj...@gmail.com> wrote: > Hi Laurent, > > 1. Deduplicate with keeping the first row will generate an append-only > stream. But I guess you are expecting to keep the last row which generates > an updating stream. An alternative way is you can > use the "changelog-json" format in this repo [1], it will convert the > updating stream into append > records with change flag encoded. > 2. Yes. It will replace records with the same key, i.e. upsert statement. > 3. I think it will be available in one or two months. There will be a > first release candidate soon. > You can watch on the dev ML. I'm not sure the plan of Ververica > platform, cc @Konstantin Knauf <konstan...@ververica.com> > > Best, > Jark > > [1]: > https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format > > On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens < > laurent.exste...@euranova.eu> wrote: > >> Hi Jark, >> >> thanks for your quick reply. I was indeed expecting it. >> >> But that triggers the following questions: >> >> 1. Is there another way to do this deduplication and generate an >> append-only stream? Match Recognize? UDF? ...? >> 2. If I would put Postgres as a sink, what would happen? Will the >> events happen or will they replace the record with the same key? >> 3. When will release-1.12 be available? And when would it be >> integrated in the Ververica platform? >> >> Thanks a lot for your help! >> >> Best Regards, >> >> Laurent. >> >> >> >> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Laurent, >>> >>> This is because the deduplicate node generates an updating stream, >>> however Kafka currently only supports append-only stream. >>> This can be addressed in release-1.12, because we introduce a new >>> connector "upsert-kafka" which supports writing updating >>> streams into Kafka compacted topics. >>> >>> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e. >>> ConsumerRecord#timestamp()? >>> If yes, this is also supported in release-1.12 via metadata syntax in >>> DDL [1]: >>> >>> CREATE TABLE kafka_table ( >>> id BIGINT, >>> name STRING, >>> timestamp BIGINT METADATA, -- read timestamp >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'test-topic', >>> 'format' = 'avro' >>> ) >>> >>> Best, >>> Jark >>> >>> [1]: >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors >>> >>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens < >>> laurent.exste...@euranova.eu> wrote: >>> >>>> Hello, >>>> >>>> I'm getting an error in Flink SQL when reading from kafka, >>>> deduplicating records and sending them back to Kafka. >>>> >>>> The behavior I want is the following: >>>> >>>> *input:* >>>> | client_number | address | >>>> | ------------------- | ----------- | >>>> | 1 | addr1 | >>>> | 1 | addr1 | >>>> | 1 | addr2 | >>>> | 1 | addr2 | >>>> | 1 | addr1 | >>>> | 1 | addr1 | >>>> >>>> *output:* >>>> | client_number | address | >>>> | ------------------- | ----------- | >>>> | 1 | addr1 | >>>> | 1 | addr2 | >>>> | 1 | addr1 | >>>> >>>> The error seems to say that the type of stream created by the >>>> deduplication query is of "update & delete" type, while kafka only supports >>>> append-only: >>>> >>>> Unsupported query >>>> Table sink 'vvp.default.sat_customers_address' doesn't support >>>> consuming update and delete changes which is produced by node >>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], >>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address, >>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) >>>> >>>> >>>> --> Is there a way to create an append only query from this kind of >>>> deduplication query (see my code here below)? >>>> --> Would that work if I would use, say, a Postgres sink? >>>> >>>> Bonus question: can we extract the Kafka ingestion date using Flink >>>> SQL? (here I generated a processing date to allow ordering during >>>> deduplication) >>>> >>>> P.S.: I'm on the Ververica Platform, but I guess this error is linked >>>> to Flink SQL itself. >>>> >>>> Thanks in advance for your help. >>>> >>>> Best Regards, >>>> >>>> Laurent. >>>> >>>> ----------------------------------- >>>> -- Read from customers kafka topic >>>> ----------------------------------- >>>> CREATE TEMPORARY TABLE customers ( >>>> `client_number` INT, >>>> `name` VARCHAR(100), >>>> `address` VARCHAR(100) >>>> ) >>>> COMMENT '' >>>> WITH ( >>>> 'connector' = 'kafka', >>>> 'format' = 'csv', >>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092', >>>> 'properties.group.id' = 'flinkSQL', >>>> 'topic' = 'customers', >>>> 'csv.field-delimiter' = ';', >>>> 'scan.startup.mode' = 'earliest-offset' >>>> ); >>>> >>>> >>>> >>>> ----------------------------------- >>>> -- Add metadata >>>> ----------------------------------- >>>> CREATE TEMPORARY VIEW metadata AS >>>> SELECT * >>>> , sha256(cast(client_number as STRING)) AS customer_pk >>>> , current_timestamp AS load_date >>>> , 'Kafka topic: customers' AS record_source >>>> FROM customers; >>>> >>>> >>>> >>>> ----------------------------------- >>>> -- Deduplicate addresses >>>> ----------------------------------- >>>> CREATE TEMPORARY VIEW dedup_address as >>>> SELECT customer_pk >>>> , client_number >>>> , load_date >>>> , address >>>> FROM ( >>>> SELECT customer_pk >>>> , client_number >>>> , load_date >>>> , record_source >>>> , address >>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address >>>> ORDER BY load_date ASC) AS rownum >>>> FROM metadata >>>> ) where rownum = 1; >>>> >>>> >>>> >>>> >>>> >>>> >>>> ----------------------------------- >>>> -- Send to sat_customers_address kafka topic >>>> ----------------------------------- >>>> CREATE TEMPORARY TABLE sat_customers_address ( >>>> `customer_pk` VARCHAR(64), >>>> `client_number` INT, >>>> `address` VARCHAR(100) >>>> ) >>>> COMMENT '' >>>> WITH ( >>>> 'connector' = 'kafka', >>>> 'format' = 'csv', >>>> 'properties.bootstrap.servers' = >>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', >>>> 'properties.group.id' = 'flinkSQL', >>>> 'topic' = 'sat_customers_address' >>>> ); >>>> >>>> INSERT INTO sat_customers_address >>>> SELECT customer_pk >>>> , client_number >>>> , address >>>> FROM dedup_address; >>>> >>>> >>>> >>>> >>>> -- >>>> *Laurent Exsteens* >>>> Data Engineer >>>> (M) +32 (0) 486 20 48 36 >>>> >>>> *EURA NOVA* >>>> >>>> Rue Emile Francqui, 4 >>>> >>>> 1435 Mont-Saint-Guibert >>>> >>>> (T) +32 10 75 02 00 >>>> >>>> *euranova.eu <http://euranova.eu/>* >>>> >>>> *research.euranova.eu* <http://research.euranova.eu/> >>>> >>>> ♻ Be green, keep it on the screen >>> >>> >> >> -- >> *Laurent Exsteens* >> Data Engineer >> (M) +32 (0) 486 20 48 36 >> >> *EURA NOVA* >> >> Rue Emile Francqui, 4 >> >> 1435 Mont-Saint-Guibert >> >> (T) +32 10 75 02 00 >> >> *euranova.eu <http://euranova.eu/>* >> >> *research.euranova.eu* <http://research.euranova.eu/> >> >> ♻ Be green, keep it on the screen > > -- Konstantin Knauf https://twitter.com/snntrable https://github.com/knaufk