I'm now trying with a MATCH_RECOGNIZE:

SELECT *
FROM customers
MATCH_RECOGNIZE (
PARTITION BY client_number
ORDER BY proctime()
MEASURES
LAST(B.client_number) as client_number,
LAST(B.address) as address
PATTERN (A* B)
DEFINE
B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
) as T;


However, I get the following error:
SQL validation failed. Index 0 out of bounds for length 0

I've read the documentation and tried different formulations, but I don't
manage to make this work.
However, it should be possible to express what I need since the examples of
the documentation allos for way more complex patterns.
What am I doing wrong?

I still prefer sth simpler such as the deduplication query (if it could
work as I need it), as it would be a way harder sell to propose FlinkSQL to
lower the expertise required for our jobs, if a "simple" deduplication
already needs a complex query involving CEP....

Thanks in advance for your help!

Best Regards,

Laurent.



On Thu, 12 Nov 2020 at 17:22, Laurent Exsteens <laurent.exste...@euranova.eu>
wrote:

> I see what was my mistake: I was using a field in my ORDER BY, while it
> only support proctime() for now.
>
> That allows me to create an append only stream, thanks a lot!
>
> However, it still does not allow me to do what I need:
>
>
> *If I use both my primary key and changing column in PARTITION BY, then it
> does not allow me to come back to a previous value of my changed column:*
> SELECT client_number
> , address
> , proctime() as load_date
> FROM (
> SELECT client_number
> , address
> , ROW_NUMBER() OVER (PARTITION BY *client_number, **address* ORDER BY
> proctime() ASC) AS rownum
> FROM customers
> ) where rownum = 1;
> *input:*
>                     *output*:
> | client_number | address |  load_date |                   | client_number
> | address |  load_date |
> | ------------------- | ----------- | -------------- |                  |
> ------------------- | ----------- | -------------- |
> | 1                      | addr1     | ts1             |     -->        |
> 1                      | addr1     | ts1             |
> | 1                      | addr1     | ts2             |
> | 1                      | addr2     | ts3             |     -->        |
> 1                      | addr2     | ts3             |
> | 1                      | addr2     | ts4             |
> | 1                      | addr1     | ts5             |
> *-->        | 1                      | addr1     | ts5             | <--
> this one does not show --> I cannot change back to a previous value :(*
> | 1                      | addr1     | ts6             |
>
>
>
>
>
> *If however I only put my primary key in PARTITION BY, then I only get the
> first value of my changed column:*
> SELECT client_number
> , address
> , proctime() as load_date
> FROM (
> SELECT client_number
> , address
> , ROW_NUMBER() OVER (PARTITION BY *client_number* ORDER BY proctime() ASC)
> AS rownum
> FROM customers
> ) where rownum = 1;
> *input:*
>                     *output*:
> | client_number | address |  load_date |                   | client_number
> | address |  load_date |
> | ------------------- | ----------- | -------------- |                  |
> ------------------- | ----------- | -------------- |
> | 1                      | addr1     | ts1             |     -->        |
> 1                      | addr1     | ts1             |
> | 1                      | addr1     | ts2             |
> | 1                      | addr2     | ts3             |
> *    -->        | 1                      | addr2     | ts3             |
> <-- this one does not show :(*
> | 1                      | addr2     | ts4             |
> | 1                      | addr1     | ts5             |
> *-->        | 1                      | addr1     | ts5             | <--
> this one does not show :(*
> | 1                      | addr1     | ts6             |
>
>
>
> Basically, I need to deduplicate, *but only keeping in the deduplication
> state the latest value of the changed column* to compare with. While here
> it seems to keep all previous values...
>
> Is there a way to obtain the behavior I need (with this deduplication
> method or another one)?
>
> Thanks in advance.
>
> Best Regards,
>
> Laurent.
>
>
> On Thu, 12 Nov 2020 at 12:48, Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Laurent,
>>
>> 1. Currently, it's impossible to convert deduplicate with last row into
>> an append-only stream.
>> 2. Yes, I think Ververica platform doesn't support 'changelog-json'
>> format natively.
>>
>> However, regarding your case, I think you can use keep first row on
>> client_number+address key.
>>
>> SELECT *
>> FROM (
>>    SELECT client_number, address, load_date
>>      ROW_NUMBER() OVER
>>        (PARTITION BY client_number, address ORDER BY proctime() ASC) AS
>> rownum
>>    FROM src)
>> WHERE rownum = 1
>>
>> That means, the duplicate records on the same client_number + address
>> will be ignored,
>> but the new value of address will be emitted as an append-only stream.
>>
>> Hope this helps you.
>>
>> Best,
>> Jark
>>
>>
>> On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens <
>> laurent.exste...@euranova.eu> wrote:
>>
>>> Hi Jark,
>>>
>>> thanks again for your quick response!
>>>
>>> I tried multiple variants of my query by:
>>> - specifying only the primary key in the PARTITION BY clause
>>> - changing the order to DESC to keep the last row
>>>
>>> --> I unfortunately always get the same error message.
>>> If I try to make a simple select on the result of this query, I also get
>>> the following error: The submitted query is not an append-only query.
>>> Only queries producing exclusively new rows over time are supported at the
>>> moment. So whatever change I make, I never get an append-only query -->
>>> Is there something I missed?
>>>
>>> I also tried to write to kafka as changelog-json, but I got the answer: The
>>> sink connector for table `vvp`.`default`.`sat_customers_address` could not
>>> be created. 'changelog-json' is not a supported sink format. Supported sink
>>> formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet'].
>>> (maybe because I'm on the Ververica platform?)
>>> This also seem to require an extra kafka topic then, so not ideal.
>>>
>>>
>>> *I'm starting to wonder if the deduplication query is really what I
>>> need.*
>>>
>>> What I need is:
>>> - to forward only the records where some columns (ideally configurable)
>>> change for a specific primary key.
>>> - in realtime (no windowing)
>>> - and have as a result an append-only stream.
>>>
>>> Like this:
>>>
>>> *input:*
>>>                     *output* (this is what should ultimatelly be
>>> published to Kafka and later inserted in a RDBMS):
>>> | client_number | address |  load_date |                   |
>>> client_number | address |  load_date |
>>> | ------------------- | ----------- | -------------- |
>>> | ------------------- | ----------- | -------------- |
>>> | 1                      | addr1     | ts1             |     -->
>>> | 1                      | addr1     | ts1             |
>>> | 1                      | addr1     | ts2             |
>>> | 1                      | addr2     | ts3             |     -->
>>> | 1                      | addr2     | ts3             |
>>> | 1                      | addr2     | ts4             |
>>> | 1                      | addr1     | ts5             |     -->
>>> | 1                      | addr1     | ts5             |
>>> | 1                      | addr1     | ts6             |
>>>
>>>
>>> --> is this deduplication query the right fit therefore?
>>>      - if yes, how should it be written to generate an append-only
>>> stream?
>>>      - If not, are there other options? (Match Recognize, UDF, ....?)
>>>
>>> Thanks a lot for your much appreciated help :).
>>>
>>> Best Regards,
>>>
>>> Laurent.
>>>
>>>
>>> On Thu, 12 Nov 2020 at 07:26, Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Hi Laurent,
>>>>
>>>> > What I want is a record to be forwarded only if some of the columns
>>>> change
>>>>
>>>> IIUC, what you want is still deduplication with the last row.
>>>> Keeping first row will drop all the duplicate rows on the same primary
>>>> key.
>>>> Keeping last row will emit updates when the duplicate rows on the same
>>>> primary key, that means column value changes will notify downstream
>>>> operators.
>>>> The difference of keeping first row and last row is specified by the
>>>> direction of ORDER BY clause [1].
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens <
>>>> laurent.exste...@euranova.eu> wrote:
>>>>
>>>>> Thanks.
>>>>>
>>>>> I actually want the first row. What I want is a record to be forwarded
>>>>> only if some of the columns change (of course keyed by the primary key). I
>>>>> used rownum = 1, is that not selecting the first row?
>>>>>
>>>>> How can I adapt my query to let only the row effectively changing the
>>>>> values pass, as an append only stream?
>>>>>
>>>>> If not possible, I'll look at converting it after. But I prefer a
>>>>> solution in the deduplication query.
>>>>> The goal is to show my customer that what they want to achieve is very
>>>>> straightforward in flink SQL, so the simpler the queries the better. I 
>>>>> need
>>>>> to present my conclusions tomorrow.
>>>>>
>>>>> Thanks a lot already for your help!
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Laurent.
>>>>>
>>>>>
>>>>> On Thu, Nov 12, 2020, 03:43 Jark Wu <imj...@gmail.com> wrote:
>>>>>
>>>>>> Hi Laurent,
>>>>>>
>>>>>> 1. Deduplicate with keeping the first row will generate an
>>>>>> append-only stream. But I guess you are expecting to keep the last row
>>>>>> which generates an updating stream. An alternative way is you can
>>>>>>  use the "changelog-json" format in this repo [1], it will convert
>>>>>> the updating stream into append
>>>>>> records with change flag encoded.
>>>>>> 2. Yes. It will replace records with the same key, i.e. upsert
>>>>>> statement.
>>>>>> 3. I think it will be available in one or two months. There will be a
>>>>>> first release candidate soon.
>>>>>>     You can watch on the dev ML. I'm not sure the plan of Ververica
>>>>>> platform, cc @Konstantin Knauf <konstan...@ververica.com>
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> [1]:
>>>>>> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
>>>>>>
>>>>>> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <
>>>>>> laurent.exste...@euranova.eu> wrote:
>>>>>>
>>>>>>> Hi Jark,
>>>>>>>
>>>>>>> thanks for your quick reply. I was indeed expecting it.
>>>>>>>
>>>>>>> But that triggers the following questions:
>>>>>>>
>>>>>>>    1. Is there another way to do this deduplication and generate an
>>>>>>>    append-only stream? Match Recognize? UDF? ...?
>>>>>>>    2. If I would put Postgres as a sink, what would happen? Will
>>>>>>>    the events happen or will they replace the record with the same key?
>>>>>>>    3. When will release-1.12 be available? And when would it be
>>>>>>>    integrated in the Ververica platform?
>>>>>>>
>>>>>>> Thanks a lot for your help!
>>>>>>>
>>>>>>> Best Regards,
>>>>>>>
>>>>>>> Laurent.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Laurent,
>>>>>>>>
>>>>>>>> This is because the deduplicate node generates an updating stream,
>>>>>>>> however Kafka currently only supports append-only stream.
>>>>>>>> This can be addressed in release-1.12, because we introduce a new
>>>>>>>> connector "upsert-kafka" which supports writing updating
>>>>>>>>  streams into Kafka compacted topics.
>>>>>>>>
>>>>>>>> Does the "Kafka ingestion date" refer to "kafka message timestamp",
>>>>>>>> i.e. ConsumerRecord#timestamp()?
>>>>>>>> If yes, this is also supported in release-1.12 via metadata syntax
>>>>>>>> in DDL [1]:
>>>>>>>>
>>>>>>>> CREATE TABLE kafka_table (
>>>>>>>>   id BIGINT,
>>>>>>>>   name STRING,
>>>>>>>>   timestamp BIGINT METADATA,  -- read timestamp
>>>>>>>> ) WITH (
>>>>>>>>   'connector' = 'kafka',
>>>>>>>>   'topic' = 'test-topic',
>>>>>>>>   'format' = 'avro'
>>>>>>>> )
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> [1]:
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>>>>>>>
>>>>>>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>>>>>>>> laurent.exste...@euranova.eu> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I'm getting an error  in Flink SQL when reading from kafka,
>>>>>>>>> deduplicating records and sending them back to Kafka.
>>>>>>>>>
>>>>>>>>> The behavior I want is the following:
>>>>>>>>>
>>>>>>>>> *input:*
>>>>>>>>> | client_number | address |
>>>>>>>>> | ------------------- | ----------- |
>>>>>>>>> | 1                      | addr1     |
>>>>>>>>> | 1                      | addr1     |
>>>>>>>>> | 1                      | addr2     |
>>>>>>>>> | 1                      | addr2     |
>>>>>>>>> | 1                      | addr1     |
>>>>>>>>> | 1                      | addr1     |
>>>>>>>>>
>>>>>>>>> *output:*
>>>>>>>>> | client_number | address |
>>>>>>>>> | ------------------- | ----------- |
>>>>>>>>> | 1                      | addr1     |
>>>>>>>>> | 1                      | addr2     |
>>>>>>>>> | 1                      | addr1     |
>>>>>>>>>
>>>>>>>>> The error seems to say that the type of stream created by the
>>>>>>>>> deduplication query is of "update & delete" type, while kafka only 
>>>>>>>>> supports
>>>>>>>>> append-only:
>>>>>>>>>
>>>>>>>>> Unsupported query
>>>>>>>>> Table sink 'vvp.default.sat_customers_address' doesn't support
>>>>>>>>> consuming update and delete changes which is produced by node
>>>>>>>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>>>>>>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, 
>>>>>>>>> address,
>>>>>>>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --> Is there a way to create an append only query from this kind
>>>>>>>>> of deduplication query (see my code here below)?
>>>>>>>>> --> Would that work if I would use, say, a Postgres sink?
>>>>>>>>>
>>>>>>>>> Bonus question: can we extract the Kafka ingestion date using
>>>>>>>>> Flink SQL? (here I generated a processing date to allow ordering 
>>>>>>>>> during
>>>>>>>>> deduplication)
>>>>>>>>>
>>>>>>>>> P.S.: I'm on the Ververica Platform, but I guess this error is
>>>>>>>>> linked to Flink SQL itself.
>>>>>>>>>
>>>>>>>>> Thanks in advance for your help.
>>>>>>>>>
>>>>>>>>> Best Regards,
>>>>>>>>>
>>>>>>>>> Laurent.
>>>>>>>>>
>>>>>>>>> -----------------------------------
>>>>>>>>> -- Read from customers kafka topic
>>>>>>>>> -----------------------------------
>>>>>>>>> CREATE TEMPORARY TABLE customers (
>>>>>>>>> `client_number` INT,
>>>>>>>>> `name` VARCHAR(100),
>>>>>>>>> `address` VARCHAR(100)
>>>>>>>>> )
>>>>>>>>> COMMENT ''
>>>>>>>>> WITH (
>>>>>>>>> 'connector' = 'kafka',
>>>>>>>>> 'format' = 'csv',
>>>>>>>>> 'properties.bootstrap.servers' =
>>>>>>>>> 'kafka.vvp.svc.cluster.local:9092',
>>>>>>>>> 'properties.group.id' = 'flinkSQL',
>>>>>>>>> 'topic' = 'customers',
>>>>>>>>> 'csv.field-delimiter' = ';',
>>>>>>>>> 'scan.startup.mode' = 'earliest-offset'
>>>>>>>>> );
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -----------------------------------
>>>>>>>>> -- Add metadata
>>>>>>>>> -----------------------------------
>>>>>>>>> CREATE TEMPORARY VIEW metadata AS
>>>>>>>>> SELECT *
>>>>>>>>> , sha256(cast(client_number as STRING)) AS customer_pk
>>>>>>>>> , current_timestamp AS load_date
>>>>>>>>> , 'Kafka topic: customers' AS record_source
>>>>>>>>> FROM customers;
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -----------------------------------
>>>>>>>>> -- Deduplicate addresses
>>>>>>>>> -----------------------------------
>>>>>>>>> CREATE TEMPORARY VIEW dedup_address as
>>>>>>>>> SELECT customer_pk
>>>>>>>>> , client_number
>>>>>>>>> , load_date
>>>>>>>>> , address
>>>>>>>>> FROM (
>>>>>>>>> SELECT customer_pk
>>>>>>>>> , client_number
>>>>>>>>> , load_date
>>>>>>>>> , record_source
>>>>>>>>> , address
>>>>>>>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number,
>>>>>>>>> address ORDER BY load_date ASC) AS rownum
>>>>>>>>> FROM metadata
>>>>>>>>> ) where rownum = 1;
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -----------------------------------
>>>>>>>>> -- Send to sat_customers_address kafka topic
>>>>>>>>> -----------------------------------
>>>>>>>>> CREATE TEMPORARY TABLE sat_customers_address (
>>>>>>>>> `customer_pk` VARCHAR(64),
>>>>>>>>> `client_number` INT,
>>>>>>>>> `address` VARCHAR(100)
>>>>>>>>> )
>>>>>>>>> COMMENT ''
>>>>>>>>> WITH (
>>>>>>>>> 'connector' = 'kafka',
>>>>>>>>> 'format' = 'csv',
>>>>>>>>> 'properties.bootstrap.servers' =
>>>>>>>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
>>>>>>>>> 'properties.group.id' = 'flinkSQL',
>>>>>>>>> 'topic' = 'sat_customers_address'
>>>>>>>>> );
>>>>>>>>>
>>>>>>>>> INSERT INTO sat_customers_address
>>>>>>>>> SELECT customer_pk
>>>>>>>>> , client_number
>>>>>>>>> , address
>>>>>>>>> FROM dedup_address;
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> *Laurent Exsteens*
>>>>>>>>> Data Engineer
>>>>>>>>> (M) +32 (0) 486 20 48 36
>>>>>>>>>
>>>>>>>>> *EURA NOVA*
>>>>>>>>>
>>>>>>>>> Rue Emile Francqui, 4
>>>>>>>>>
>>>>>>>>> 1435 Mont-Saint-Guibert
>>>>>>>>>
>>>>>>>>> (T) +32 10 75 02 00
>>>>>>>>>
>>>>>>>>> *euranova.eu <http://euranova.eu/>*
>>>>>>>>>
>>>>>>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>>>>>>
>>>>>>>>> ♻ Be green, keep it on the screen
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Laurent Exsteens*
>>>>>>> Data Engineer
>>>>>>> (M) +32 (0) 486 20 48 36
>>>>>>>
>>>>>>> *EURA NOVA*
>>>>>>>
>>>>>>> Rue Emile Francqui, 4
>>>>>>>
>>>>>>> 1435 Mont-Saint-Guibert
>>>>>>>
>>>>>>> (T) +32 10 75 02 00
>>>>>>>
>>>>>>> *euranova.eu <http://euranova.eu/>*
>>>>>>>
>>>>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>>>>
>>>>>>> ♻ Be green, keep it on the screen
>>>>>>
>>>>>>
>>>>> ♻ Be green, keep it on the screen
>>>>
>>>>
>>>
>>> --
>>> *Laurent Exsteens*
>>> Data Engineer
>>> (M) +32 (0) 486 20 48 36
>>>
>>> *EURA NOVA*
>>>
>>> Rue Emile Francqui, 4
>>>
>>> 1435 Mont-Saint-Guibert
>>>
>>> (T) +32 10 75 02 00
>>>
>>> *euranova.eu <http://euranova.eu/>*
>>>
>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>
>>> ♻ Be green, keep it on the screen
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu <http://euranova.eu/>*
>
> *research.euranova.eu* <http://research.euranova.eu/>
>


-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

-- 
♻ Be green, keep it on the screen

Reply via email to