I'm now trying with a MATCH_RECOGNIZE:
SELECT * FROM customers MATCH_RECOGNIZE ( PARTITION BY client_number ORDER BY proctime() MEASURES LAST(B.client_number) as client_number, LAST(B.address) as address PATTERN (A* B) DEFINE B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1) ) as T; However, I get the following error: SQL validation failed. Index 0 out of bounds for length 0 I've read the documentation and tried different formulations, but I don't manage to make this work. However, it should be possible to express what I need since the examples of the documentation allos for way more complex patterns. What am I doing wrong? I still prefer sth simpler such as the deduplication query (if it could work as I need it), as it would be a way harder sell to propose FlinkSQL to lower the expertise required for our jobs, if a "simple" deduplication already needs a complex query involving CEP.... Thanks in advance for your help! Best Regards, Laurent. On Thu, 12 Nov 2020 at 17:22, Laurent Exsteens <laurent.exste...@euranova.eu> wrote: > I see what was my mistake: I was using a field in my ORDER BY, while it > only support proctime() for now. > > That allows me to create an append only stream, thanks a lot! > > However, it still does not allow me to do what I need: > > > *If I use both my primary key and changing column in PARTITION BY, then it > does not allow me to come back to a previous value of my changed column:* > SELECT client_number > , address > , proctime() as load_date > FROM ( > SELECT client_number > , address > , ROW_NUMBER() OVER (PARTITION BY *client_number, **address* ORDER BY > proctime() ASC) AS rownum > FROM customers > ) where rownum = 1; > *input:* > *output*: > | client_number | address | load_date | | client_number > | address | load_date | > | ------------------- | ----------- | -------------- | | > ------------------- | ----------- | -------------- | > | 1 | addr1 | ts1 | --> | > 1 | addr1 | ts1 | > | 1 | addr1 | ts2 | > | 1 | addr2 | ts3 | --> | > 1 | addr2 | ts3 | > | 1 | addr2 | ts4 | > | 1 | addr1 | ts5 | > *--> | 1 | addr1 | ts5 | <-- > this one does not show --> I cannot change back to a previous value :(* > | 1 | addr1 | ts6 | > > > > > > *If however I only put my primary key in PARTITION BY, then I only get the > first value of my changed column:* > SELECT client_number > , address > , proctime() as load_date > FROM ( > SELECT client_number > , address > , ROW_NUMBER() OVER (PARTITION BY *client_number* ORDER BY proctime() ASC) > AS rownum > FROM customers > ) where rownum = 1; > *input:* > *output*: > | client_number | address | load_date | | client_number > | address | load_date | > | ------------------- | ----------- | -------------- | | > ------------------- | ----------- | -------------- | > | 1 | addr1 | ts1 | --> | > 1 | addr1 | ts1 | > | 1 | addr1 | ts2 | > | 1 | addr2 | ts3 | > * --> | 1 | addr2 | ts3 | > <-- this one does not show :(* > | 1 | addr2 | ts4 | > | 1 | addr1 | ts5 | > *--> | 1 | addr1 | ts5 | <-- > this one does not show :(* > | 1 | addr1 | ts6 | > > > > Basically, I need to deduplicate, *but only keeping in the deduplication > state the latest value of the changed column* to compare with. While here > it seems to keep all previous values... > > Is there a way to obtain the behavior I need (with this deduplication > method or another one)? > > Thanks in advance. > > Best Regards, > > Laurent. > > > On Thu, 12 Nov 2020 at 12:48, Jark Wu <imj...@gmail.com> wrote: > >> Hi Laurent, >> >> 1. Currently, it's impossible to convert deduplicate with last row into >> an append-only stream. >> 2. Yes, I think Ververica platform doesn't support 'changelog-json' >> format natively. >> >> However, regarding your case, I think you can use keep first row on >> client_number+address key. >> >> SELECT * >> FROM ( >> SELECT client_number, address, load_date >> ROW_NUMBER() OVER >> (PARTITION BY client_number, address ORDER BY proctime() ASC) AS >> rownum >> FROM src) >> WHERE rownum = 1 >> >> That means, the duplicate records on the same client_number + address >> will be ignored, >> but the new value of address will be emitted as an append-only stream. >> >> Hope this helps you. >> >> Best, >> Jark >> >> >> On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens < >> laurent.exste...@euranova.eu> wrote: >> >>> Hi Jark, >>> >>> thanks again for your quick response! >>> >>> I tried multiple variants of my query by: >>> - specifying only the primary key in the PARTITION BY clause >>> - changing the order to DESC to keep the last row >>> >>> --> I unfortunately always get the same error message. >>> If I try to make a simple select on the result of this query, I also get >>> the following error: The submitted query is not an append-only query. >>> Only queries producing exclusively new rows over time are supported at the >>> moment. So whatever change I make, I never get an append-only query --> >>> Is there something I missed? >>> >>> I also tried to write to kafka as changelog-json, but I got the answer: The >>> sink connector for table `vvp`.`default`.`sat_customers_address` could not >>> be created. 'changelog-json' is not a supported sink format. Supported sink >>> formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet']. >>> (maybe because I'm on the Ververica platform?) >>> This also seem to require an extra kafka topic then, so not ideal. >>> >>> >>> *I'm starting to wonder if the deduplication query is really what I >>> need.* >>> >>> What I need is: >>> - to forward only the records where some columns (ideally configurable) >>> change for a specific primary key. >>> - in realtime (no windowing) >>> - and have as a result an append-only stream. >>> >>> Like this: >>> >>> *input:* >>> *output* (this is what should ultimatelly be >>> published to Kafka and later inserted in a RDBMS): >>> | client_number | address | load_date | | >>> client_number | address | load_date | >>> | ------------------- | ----------- | -------------- | >>> | ------------------- | ----------- | -------------- | >>> | 1 | addr1 | ts1 | --> >>> | 1 | addr1 | ts1 | >>> | 1 | addr1 | ts2 | >>> | 1 | addr2 | ts3 | --> >>> | 1 | addr2 | ts3 | >>> | 1 | addr2 | ts4 | >>> | 1 | addr1 | ts5 | --> >>> | 1 | addr1 | ts5 | >>> | 1 | addr1 | ts6 | >>> >>> >>> --> is this deduplication query the right fit therefore? >>> - if yes, how should it be written to generate an append-only >>> stream? >>> - If not, are there other options? (Match Recognize, UDF, ....?) >>> >>> Thanks a lot for your much appreciated help :). >>> >>> Best Regards, >>> >>> Laurent. >>> >>> >>> On Thu, 12 Nov 2020 at 07:26, Jark Wu <imj...@gmail.com> wrote: >>> >>>> Hi Laurent, >>>> >>>> > What I want is a record to be forwarded only if some of the columns >>>> change >>>> >>>> IIUC, what you want is still deduplication with the last row. >>>> Keeping first row will drop all the duplicate rows on the same primary >>>> key. >>>> Keeping last row will emit updates when the duplicate rows on the same >>>> primary key, that means column value changes will notify downstream >>>> operators. >>>> The difference of keeping first row and last row is specified by the >>>> direction of ORDER BY clause [1]. >>>> >>>> Best, >>>> Jark >>>> >>>> [1]: >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication >>>> >>>> >>>> >>>> >>>> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens < >>>> laurent.exste...@euranova.eu> wrote: >>>> >>>>> Thanks. >>>>> >>>>> I actually want the first row. What I want is a record to be forwarded >>>>> only if some of the columns change (of course keyed by the primary key). I >>>>> used rownum = 1, is that not selecting the first row? >>>>> >>>>> How can I adapt my query to let only the row effectively changing the >>>>> values pass, as an append only stream? >>>>> >>>>> If not possible, I'll look at converting it after. But I prefer a >>>>> solution in the deduplication query. >>>>> The goal is to show my customer that what they want to achieve is very >>>>> straightforward in flink SQL, so the simpler the queries the better. I >>>>> need >>>>> to present my conclusions tomorrow. >>>>> >>>>> Thanks a lot already for your help! >>>>> >>>>> Best regards, >>>>> >>>>> Laurent. >>>>> >>>>> >>>>> On Thu, Nov 12, 2020, 03:43 Jark Wu <imj...@gmail.com> wrote: >>>>> >>>>>> Hi Laurent, >>>>>> >>>>>> 1. Deduplicate with keeping the first row will generate an >>>>>> append-only stream. But I guess you are expecting to keep the last row >>>>>> which generates an updating stream. An alternative way is you can >>>>>> use the "changelog-json" format in this repo [1], it will convert >>>>>> the updating stream into append >>>>>> records with change flag encoded. >>>>>> 2. Yes. It will replace records with the same key, i.e. upsert >>>>>> statement. >>>>>> 3. I think it will be available in one or two months. There will be a >>>>>> first release candidate soon. >>>>>> You can watch on the dev ML. I'm not sure the plan of Ververica >>>>>> platform, cc @Konstantin Knauf <konstan...@ververica.com> >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> [1]: >>>>>> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format >>>>>> >>>>>> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens < >>>>>> laurent.exste...@euranova.eu> wrote: >>>>>> >>>>>>> Hi Jark, >>>>>>> >>>>>>> thanks for your quick reply. I was indeed expecting it. >>>>>>> >>>>>>> But that triggers the following questions: >>>>>>> >>>>>>> 1. Is there another way to do this deduplication and generate an >>>>>>> append-only stream? Match Recognize? UDF? ...? >>>>>>> 2. If I would put Postgres as a sink, what would happen? Will >>>>>>> the events happen or will they replace the record with the same key? >>>>>>> 3. When will release-1.12 be available? And when would it be >>>>>>> integrated in the Ververica platform? >>>>>>> >>>>>>> Thanks a lot for your help! >>>>>>> >>>>>>> Best Regards, >>>>>>> >>>>>>> Laurent. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Laurent, >>>>>>>> >>>>>>>> This is because the deduplicate node generates an updating stream, >>>>>>>> however Kafka currently only supports append-only stream. >>>>>>>> This can be addressed in release-1.12, because we introduce a new >>>>>>>> connector "upsert-kafka" which supports writing updating >>>>>>>> streams into Kafka compacted topics. >>>>>>>> >>>>>>>> Does the "Kafka ingestion date" refer to "kafka message timestamp", >>>>>>>> i.e. ConsumerRecord#timestamp()? >>>>>>>> If yes, this is also supported in release-1.12 via metadata syntax >>>>>>>> in DDL [1]: >>>>>>>> >>>>>>>> CREATE TABLE kafka_table ( >>>>>>>> id BIGINT, >>>>>>>> name STRING, >>>>>>>> timestamp BIGINT METADATA, -- read timestamp >>>>>>>> ) WITH ( >>>>>>>> 'connector' = 'kafka', >>>>>>>> 'topic' = 'test-topic', >>>>>>>> 'format' = 'avro' >>>>>>>> ) >>>>>>>> >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> >>>>>>>> [1]: >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors >>>>>>>> >>>>>>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens < >>>>>>>> laurent.exste...@euranova.eu> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> I'm getting an error in Flink SQL when reading from kafka, >>>>>>>>> deduplicating records and sending them back to Kafka. >>>>>>>>> >>>>>>>>> The behavior I want is the following: >>>>>>>>> >>>>>>>>> *input:* >>>>>>>>> | client_number | address | >>>>>>>>> | ------------------- | ----------- | >>>>>>>>> | 1 | addr1 | >>>>>>>>> | 1 | addr1 | >>>>>>>>> | 1 | addr2 | >>>>>>>>> | 1 | addr2 | >>>>>>>>> | 1 | addr1 | >>>>>>>>> | 1 | addr1 | >>>>>>>>> >>>>>>>>> *output:* >>>>>>>>> | client_number | address | >>>>>>>>> | ------------------- | ----------- | >>>>>>>>> | 1 | addr1 | >>>>>>>>> | 1 | addr2 | >>>>>>>>> | 1 | addr1 | >>>>>>>>> >>>>>>>>> The error seems to say that the type of stream created by the >>>>>>>>> deduplication query is of "update & delete" type, while kafka only >>>>>>>>> supports >>>>>>>>> append-only: >>>>>>>>> >>>>>>>>> Unsupported query >>>>>>>>> Table sink 'vvp.default.sat_customers_address' doesn't support >>>>>>>>> consuming update and delete changes which is produced by node >>>>>>>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER], >>>>>>>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, >>>>>>>>> address, >>>>>>>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3]) >>>>>>>>> >>>>>>>>> >>>>>>>>> --> Is there a way to create an append only query from this kind >>>>>>>>> of deduplication query (see my code here below)? >>>>>>>>> --> Would that work if I would use, say, a Postgres sink? >>>>>>>>> >>>>>>>>> Bonus question: can we extract the Kafka ingestion date using >>>>>>>>> Flink SQL? (here I generated a processing date to allow ordering >>>>>>>>> during >>>>>>>>> deduplication) >>>>>>>>> >>>>>>>>> P.S.: I'm on the Ververica Platform, but I guess this error is >>>>>>>>> linked to Flink SQL itself. >>>>>>>>> >>>>>>>>> Thanks in advance for your help. >>>>>>>>> >>>>>>>>> Best Regards, >>>>>>>>> >>>>>>>>> Laurent. >>>>>>>>> >>>>>>>>> ----------------------------------- >>>>>>>>> -- Read from customers kafka topic >>>>>>>>> ----------------------------------- >>>>>>>>> CREATE TEMPORARY TABLE customers ( >>>>>>>>> `client_number` INT, >>>>>>>>> `name` VARCHAR(100), >>>>>>>>> `address` VARCHAR(100) >>>>>>>>> ) >>>>>>>>> COMMENT '' >>>>>>>>> WITH ( >>>>>>>>> 'connector' = 'kafka', >>>>>>>>> 'format' = 'csv', >>>>>>>>> 'properties.bootstrap.servers' = >>>>>>>>> 'kafka.vvp.svc.cluster.local:9092', >>>>>>>>> 'properties.group.id' = 'flinkSQL', >>>>>>>>> 'topic' = 'customers', >>>>>>>>> 'csv.field-delimiter' = ';', >>>>>>>>> 'scan.startup.mode' = 'earliest-offset' >>>>>>>>> ); >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> ----------------------------------- >>>>>>>>> -- Add metadata >>>>>>>>> ----------------------------------- >>>>>>>>> CREATE TEMPORARY VIEW metadata AS >>>>>>>>> SELECT * >>>>>>>>> , sha256(cast(client_number as STRING)) AS customer_pk >>>>>>>>> , current_timestamp AS load_date >>>>>>>>> , 'Kafka topic: customers' AS record_source >>>>>>>>> FROM customers; >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> ----------------------------------- >>>>>>>>> -- Deduplicate addresses >>>>>>>>> ----------------------------------- >>>>>>>>> CREATE TEMPORARY VIEW dedup_address as >>>>>>>>> SELECT customer_pk >>>>>>>>> , client_number >>>>>>>>> , load_date >>>>>>>>> , address >>>>>>>>> FROM ( >>>>>>>>> SELECT customer_pk >>>>>>>>> , client_number >>>>>>>>> , load_date >>>>>>>>> , record_source >>>>>>>>> , address >>>>>>>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, >>>>>>>>> address ORDER BY load_date ASC) AS rownum >>>>>>>>> FROM metadata >>>>>>>>> ) where rownum = 1; >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> ----------------------------------- >>>>>>>>> -- Send to sat_customers_address kafka topic >>>>>>>>> ----------------------------------- >>>>>>>>> CREATE TEMPORARY TABLE sat_customers_address ( >>>>>>>>> `customer_pk` VARCHAR(64), >>>>>>>>> `client_number` INT, >>>>>>>>> `address` VARCHAR(100) >>>>>>>>> ) >>>>>>>>> COMMENT '' >>>>>>>>> WITH ( >>>>>>>>> 'connector' = 'kafka', >>>>>>>>> 'format' = 'csv', >>>>>>>>> 'properties.bootstrap.servers' = >>>>>>>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092', >>>>>>>>> 'properties.group.id' = 'flinkSQL', >>>>>>>>> 'topic' = 'sat_customers_address' >>>>>>>>> ); >>>>>>>>> >>>>>>>>> INSERT INTO sat_customers_address >>>>>>>>> SELECT customer_pk >>>>>>>>> , client_number >>>>>>>>> , address >>>>>>>>> FROM dedup_address; >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> *Laurent Exsteens* >>>>>>>>> Data Engineer >>>>>>>>> (M) +32 (0) 486 20 48 36 >>>>>>>>> >>>>>>>>> *EURA NOVA* >>>>>>>>> >>>>>>>>> Rue Emile Francqui, 4 >>>>>>>>> >>>>>>>>> 1435 Mont-Saint-Guibert >>>>>>>>> >>>>>>>>> (T) +32 10 75 02 00 >>>>>>>>> >>>>>>>>> *euranova.eu <http://euranova.eu/>* >>>>>>>>> >>>>>>>>> *research.euranova.eu* <http://research.euranova.eu/> >>>>>>>>> >>>>>>>>> ♻ Be green, keep it on the screen >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> *Laurent Exsteens* >>>>>>> Data Engineer >>>>>>> (M) +32 (0) 486 20 48 36 >>>>>>> >>>>>>> *EURA NOVA* >>>>>>> >>>>>>> Rue Emile Francqui, 4 >>>>>>> >>>>>>> 1435 Mont-Saint-Guibert >>>>>>> >>>>>>> (T) +32 10 75 02 00 >>>>>>> >>>>>>> *euranova.eu <http://euranova.eu/>* >>>>>>> >>>>>>> *research.euranova.eu* <http://research.euranova.eu/> >>>>>>> >>>>>>> ♻ Be green, keep it on the screen >>>>>> >>>>>> >>>>> ♻ Be green, keep it on the screen >>>> >>>> >>> >>> -- >>> *Laurent Exsteens* >>> Data Engineer >>> (M) +32 (0) 486 20 48 36 >>> >>> *EURA NOVA* >>> >>> Rue Emile Francqui, 4 >>> >>> 1435 Mont-Saint-Guibert >>> >>> (T) +32 10 75 02 00 >>> >>> *euranova.eu <http://euranova.eu/>* >>> >>> *research.euranova.eu* <http://research.euranova.eu/> >>> >>> ♻ Be green, keep it on the screen >> >> > > -- > *Laurent Exsteens* > Data Engineer > (M) +32 (0) 486 20 48 36 > > *EURA NOVA* > > Rue Emile Francqui, 4 > > 1435 Mont-Saint-Guibert > > (T) +32 10 75 02 00 > > *euranova.eu <http://euranova.eu/>* > > *research.euranova.eu* <http://research.euranova.eu/> > -- *Laurent Exsteens* Data Engineer (M) +32 (0) 486 20 48 36 *EURA NOVA* Rue Emile Francqui, 4 1435 Mont-Saint-Guibert (T) +32 10 75 02 00 *euranova.eu <http://euranova.eu/>* *research.euranova.eu* <http://research.euranova.eu/> -- ♻ Be green, keep it on the screen