Hi Laurent,

This is because the deduplicate node generates an updating stream, however
Kafka currently only supports append-only stream.
This can be addressed in release-1.12, because we introduce a new connector
"upsert-kafka" which supports writing updating
 streams into Kafka compacted topics.

Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
ConsumerRecord#timestamp()?
If yes, this is also supported in release-1.12 via metadata syntax in DDL
[1]:

CREATE TABLE kafka_table (
  id BIGINT,
  name STRING,
  timestamp BIGINT METADATA,  -- read timestamp
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-topic',
  'format' = 'avro'
)

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors

On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <laurent.exste...@euranova.eu>
wrote:

> Hello,
>
> I'm getting an error  in Flink SQL when reading from kafka, deduplicating
> records and sending them back to Kafka.
>
> The behavior I want is the following:
>
> *input:*
> | client_number | address |
> | ------------------- | ----------- |
> | 1                      | addr1     |
> | 1                      | addr1     |
> | 1                      | addr2     |
> | 1                      | addr2     |
> | 1                      | addr1     |
> | 1                      | addr1     |
>
> *output:*
> | client_number | address |
> | ------------------- | ----------- |
> | 1                      | addr1     |
> | 1                      | addr2     |
> | 1                      | addr1     |
>
> The error seems to say that the type of stream created by the
> deduplication query is of "update & delete" type, while kafka only supports
> append-only:
>
> Unsupported query
> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
> update and delete changes which is produced by node
> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>
>
> --> Is there a way to create an append only query from this kind of
> deduplication query (see my code here below)?
> --> Would that work if I would use, say, a Postgres sink?
>
> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
> (here I generated a processing date to allow ordering during deduplication)
>
> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
> Flink SQL itself.
>
> Thanks in advance for your help.
>
> Best Regards,
>
> Laurent.
>
> -----------------------------------
> -- Read from customers kafka topic
> -----------------------------------
> CREATE TEMPORARY TABLE customers (
> `client_number` INT,
> `name` VARCHAR(100),
> `address` VARCHAR(100)
> )
> COMMENT ''
> WITH (
> 'connector' = 'kafka',
> 'format' = 'csv',
> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
> 'properties.group.id' = 'flinkSQL',
> 'topic' = 'customers',
> 'csv.field-delimiter' = ';',
> 'scan.startup.mode' = 'earliest-offset'
> );
>
>
>
> -----------------------------------
> -- Add metadata
> -----------------------------------
> CREATE TEMPORARY VIEW metadata AS
> SELECT *
> , sha256(cast(client_number as STRING)) AS customer_pk
> , current_timestamp AS load_date
> , 'Kafka topic: customers' AS record_source
> FROM customers;
>
>
>
> -----------------------------------
> -- Deduplicate addresses
> -----------------------------------
> CREATE TEMPORARY VIEW dedup_address as
> SELECT customer_pk
> , client_number
> , load_date
> , address
> FROM (
> SELECT customer_pk
> , client_number
> , load_date
> , record_source
> , address
> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
> ORDER BY load_date ASC) AS rownum
> FROM metadata
> ) where rownum = 1;
>
>
>
>
>
>
> -----------------------------------
> -- Send to sat_customers_address kafka topic
> -----------------------------------
> CREATE TEMPORARY TABLE sat_customers_address (
> `customer_pk` VARCHAR(64),
> `client_number` INT,
> `address` VARCHAR(100)
> )
> COMMENT ''
> WITH (
> 'connector' = 'kafka',
> 'format' = 'csv',
> 'properties.bootstrap.servers' =
> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
> 'properties.group.id' = 'flinkSQL',
> 'topic' = 'sat_customers_address'
> );
>
> INSERT INTO sat_customers_address
> SELECT customer_pk
> , client_number
> , address
> FROM dedup_address;
>
>
>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu <http://euranova.eu/>*
>
> *research.euranova.eu* <http://research.euranova.eu/>
>
> ♻ Be green, keep it on the screen

Reply via email to