I see what was my mistake: I was using a field in my ORDER BY, while it
only support proctime() for now.

That allows me to create an append only stream, thanks a lot!

However, it still does not allow me to do what I need:


*If I use both my primary key and changing column in PARTITION BY, then it
does not allow me to come back to a previous value of my changed column:*
SELECT client_number
, address
, proctime() as load_date
FROM (
SELECT client_number
, address
, ROW_NUMBER() OVER (PARTITION BY *client_number, **address* ORDER BY
proctime() ASC) AS rownum
FROM customers
) where rownum = 1;
*input:*
                    *output*:
| client_number | address |  load_date |                   | client_number
| address |  load_date |
| ------------------- | ----------- | -------------- |                  |
------------------- | ----------- | -------------- |
| 1                      | addr1     | ts1             |     -->        |
1                      | addr1     | ts1             |
| 1                      | addr1     | ts2             |
| 1                      | addr2     | ts3             |     -->        |
1                      | addr2     | ts3             |
| 1                      | addr2     | ts4             |
| 1                      | addr1     | ts5             |
*-->        | 1                      | addr1     | ts5             | <--
this one does not show --> I cannot change back to a previous value :(*
| 1                      | addr1     | ts6             |





*If however I only put my primary key in PARTITION BY, then I only get the
first value of my changed column:*
SELECT client_number
, address
, proctime() as load_date
FROM (
SELECT client_number
, address
, ROW_NUMBER() OVER (PARTITION BY *client_number* ORDER BY proctime() ASC)
AS rownum
FROM customers
) where rownum = 1;
*input:*
                    *output*:
| client_number | address |  load_date |                   | client_number
| address |  load_date |
| ------------------- | ----------- | -------------- |                  |
------------------- | ----------- | -------------- |
| 1                      | addr1     | ts1             |     -->        |
1                      | addr1     | ts1             |
| 1                      | addr1     | ts2             |
| 1                      | addr2     | ts3             |
*    -->        | 1                      | addr2     | ts3             |
<-- this one does not show :(*
| 1                      | addr2     | ts4             |
| 1                      | addr1     | ts5             |
*-->        | 1                      | addr1     | ts5             | <--
this one does not show :(*
| 1                      | addr1     | ts6             |



Basically, I need to deduplicate, *but only keeping in the deduplication
state the latest value of the changed column* to compare with. While here
it seems to keep all previous values...

Is there a way to obtain the behavior I need (with this deduplication
method or another one)?

Thanks in advance.

Best Regards,

Laurent.


On Thu, 12 Nov 2020 at 12:48, Jark Wu <imj...@gmail.com> wrote:

> Hi Laurent,
>
> 1. Currently, it's impossible to convert deduplicate with last row into an
> append-only stream.
> 2. Yes, I think Ververica platform doesn't support 'changelog-json' format
> natively.
>
> However, regarding your case, I think you can use keep first row on
> client_number+address key.
>
> SELECT *
> FROM (
>    SELECT client_number, address, load_date
>      ROW_NUMBER() OVER
>        (PARTITION BY client_number, address ORDER BY proctime() ASC) AS
> rownum
>    FROM src)
> WHERE rownum = 1
>
> That means, the duplicate records on the same client_number + address will
> be ignored,
> but the new value of address will be emitted as an append-only stream.
>
> Hope this helps you.
>
> Best,
> Jark
>
>
> On Thu, 12 Nov 2020 at 17:49, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Hi Jark,
>>
>> thanks again for your quick response!
>>
>> I tried multiple variants of my query by:
>> - specifying only the primary key in the PARTITION BY clause
>> - changing the order to DESC to keep the last row
>>
>> --> I unfortunately always get the same error message.
>> If I try to make a simple select on the result of this query, I also get
>> the following error: The submitted query is not an append-only query.
>> Only queries producing exclusively new rows over time are supported at the
>> moment. So whatever change I make, I never get an append-only query -->
>> Is there something I missed?
>>
>> I also tried to write to kafka as changelog-json, but I got the answer: The
>> sink connector for table `vvp`.`default`.`sat_customers_address` could not
>> be created. 'changelog-json' is not a supported sink format. Supported sink
>> formats are: ['avro', 'avro-confluent', 'csv', 'json', 'orc', 'parquet'].
>> (maybe because I'm on the Ververica platform?)
>> This also seem to require an extra kafka topic then, so not ideal.
>>
>>
>> *I'm starting to wonder if the deduplication query is really what I need.*
>>
>> What I need is:
>> - to forward only the records where some columns (ideally configurable)
>> change for a specific primary key.
>> - in realtime (no windowing)
>> - and have as a result an append-only stream.
>>
>> Like this:
>>
>> *input:*
>>                     *output* (this is what should ultimatelly be
>> published to Kafka and later inserted in a RDBMS):
>> | client_number | address |  load_date |                   |
>> client_number | address |  load_date |
>> | ------------------- | ----------- | -------------- |                  |
>> ------------------- | ----------- | -------------- |
>> | 1                      | addr1     | ts1             |     -->        |
>> 1                      | addr1     | ts1             |
>> | 1                      | addr1     | ts2             |
>> | 1                      | addr2     | ts3             |     -->
>> | 1                      | addr2     | ts3             |
>> | 1                      | addr2     | ts4             |
>> | 1                      | addr1     | ts5             |     -->        |
>> 1                      | addr1     | ts5             |
>> | 1                      | addr1     | ts6             |
>>
>>
>> --> is this deduplication query the right fit therefore?
>>      - if yes, how should it be written to generate an append-only stream?
>>      - If not, are there other options? (Match Recognize, UDF, ....?)
>>
>> Thanks a lot for your much appreciated help :).
>>
>> Best Regards,
>>
>> Laurent.
>>
>>
>> On Thu, 12 Nov 2020 at 07:26, Jark Wu <imj...@gmail.com> wrote:
>>
>>> Hi Laurent,
>>>
>>> > What I want is a record to be forwarded only if some of the columns
>>> change
>>>
>>> IIUC, what you want is still deduplication with the last row.
>>> Keeping first row will drop all the duplicate rows on the same primary
>>> key.
>>> Keeping last row will emit updates when the duplicate rows on the same
>>> primary key, that means column value changes will notify downstream
>>> operators.
>>> The difference of keeping first row and last row is specified by the
>>> direction of ORDER BY clause [1].
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>>>
>>>
>>>
>>>
>>> On Thu, 12 Nov 2020 at 14:11, Laurent Exsteens <
>>> laurent.exste...@euranova.eu> wrote:
>>>
>>>> Thanks.
>>>>
>>>> I actually want the first row. What I want is a record to be forwarded
>>>> only if some of the columns change (of course keyed by the primary key). I
>>>> used rownum = 1, is that not selecting the first row?
>>>>
>>>> How can I adapt my query to let only the row effectively changing the
>>>> values pass, as an append only stream?
>>>>
>>>> If not possible, I'll look at converting it after. But I prefer a
>>>> solution in the deduplication query.
>>>> The goal is to show my customer that what they want to achieve is very
>>>> straightforward in flink SQL, so the simpler the queries the better. I need
>>>> to present my conclusions tomorrow.
>>>>
>>>> Thanks a lot already for your help!
>>>>
>>>> Best regards,
>>>>
>>>> Laurent.
>>>>
>>>>
>>>> On Thu, Nov 12, 2020, 03:43 Jark Wu <imj...@gmail.com> wrote:
>>>>
>>>>> Hi Laurent,
>>>>>
>>>>> 1. Deduplicate with keeping the first row will generate an append-only
>>>>> stream. But I guess you are expecting to keep the last row which generates
>>>>> an updating stream. An alternative way is you can
>>>>>  use the "changelog-json" format in this repo [1], it will convert the
>>>>> updating stream into append
>>>>> records with change flag encoded.
>>>>> 2. Yes. It will replace records with the same key, i.e. upsert
>>>>> statement.
>>>>> 3. I think it will be available in one or two months. There will be a
>>>>> first release candidate soon.
>>>>>     You can watch on the dev ML. I'm not sure the plan of Ververica
>>>>> platform, cc @Konstantin Knauf <konstan...@ververica.com>
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> [1]:
>>>>> https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format
>>>>>
>>>>> On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens <
>>>>> laurent.exste...@euranova.eu> wrote:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> thanks for your quick reply. I was indeed expecting it.
>>>>>>
>>>>>> But that triggers the following questions:
>>>>>>
>>>>>>    1. Is there another way to do this deduplication and generate an
>>>>>>    append-only stream? Match Recognize? UDF? ...?
>>>>>>    2. If I would put Postgres as a sink, what would happen? Will the
>>>>>>    events happen or will they replace the record with the same key?
>>>>>>    3. When will release-1.12 be available? And when would it be
>>>>>>    integrated in the Ververica platform?
>>>>>>
>>>>>> Thanks a lot for your help!
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Laurent.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, 11 Nov 2020 at 03:31, Jark Wu <imj...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Laurent,
>>>>>>>
>>>>>>> This is because the deduplicate node generates an updating stream,
>>>>>>> however Kafka currently only supports append-only stream.
>>>>>>> This can be addressed in release-1.12, because we introduce a new
>>>>>>> connector "upsert-kafka" which supports writing updating
>>>>>>>  streams into Kafka compacted topics.
>>>>>>>
>>>>>>> Does the "Kafka ingestion date" refer to "kafka message timestamp",
>>>>>>> i.e. ConsumerRecord#timestamp()?
>>>>>>> If yes, this is also supported in release-1.12 via metadata syntax
>>>>>>> in DDL [1]:
>>>>>>>
>>>>>>> CREATE TABLE kafka_table (
>>>>>>>   id BIGINT,
>>>>>>>   name STRING,
>>>>>>>   timestamp BIGINT METADATA,  -- read timestamp
>>>>>>> ) WITH (
>>>>>>>   'connector' = 'kafka',
>>>>>>>   'topic' = 'test-topic',
>>>>>>>   'format' = 'avro'
>>>>>>> )
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> [1]:
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>>>>>>
>>>>>>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>>>>>>> laurent.exste...@euranova.eu> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I'm getting an error  in Flink SQL when reading from kafka,
>>>>>>>> deduplicating records and sending them back to Kafka.
>>>>>>>>
>>>>>>>> The behavior I want is the following:
>>>>>>>>
>>>>>>>> *input:*
>>>>>>>> | client_number | address |
>>>>>>>> | ------------------- | ----------- |
>>>>>>>> | 1                      | addr1     |
>>>>>>>> | 1                      | addr1     |
>>>>>>>> | 1                      | addr2     |
>>>>>>>> | 1                      | addr2     |
>>>>>>>> | 1                      | addr1     |
>>>>>>>> | 1                      | addr1     |
>>>>>>>>
>>>>>>>> *output:*
>>>>>>>> | client_number | address |
>>>>>>>> | ------------------- | ----------- |
>>>>>>>> | 1                      | addr1     |
>>>>>>>> | 1                      | addr2     |
>>>>>>>> | 1                      | addr1     |
>>>>>>>>
>>>>>>>> The error seems to say that the type of stream created by the
>>>>>>>> deduplication query is of "update & delete" type, while kafka only 
>>>>>>>> supports
>>>>>>>> append-only:
>>>>>>>>
>>>>>>>> Unsupported query
>>>>>>>> Table sink 'vvp.default.sat_customers_address' doesn't support
>>>>>>>> consuming update and delete changes which is produced by node
>>>>>>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>>>>>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, 
>>>>>>>> address,
>>>>>>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>>>>>>>
>>>>>>>>
>>>>>>>> --> Is there a way to create an append only query from this kind of
>>>>>>>> deduplication query (see my code here below)?
>>>>>>>> --> Would that work if I would use, say, a Postgres sink?
>>>>>>>>
>>>>>>>> Bonus question: can we extract the Kafka ingestion date using Flink
>>>>>>>> SQL? (here I generated a processing date to allow ordering during
>>>>>>>> deduplication)
>>>>>>>>
>>>>>>>> P.S.: I'm on the Ververica Platform, but I guess this error is
>>>>>>>> linked to Flink SQL itself.
>>>>>>>>
>>>>>>>> Thanks in advance for your help.
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>>
>>>>>>>> Laurent.
>>>>>>>>
>>>>>>>> -----------------------------------
>>>>>>>> -- Read from customers kafka topic
>>>>>>>> -----------------------------------
>>>>>>>> CREATE TEMPORARY TABLE customers (
>>>>>>>> `client_number` INT,
>>>>>>>> `name` VARCHAR(100),
>>>>>>>> `address` VARCHAR(100)
>>>>>>>> )
>>>>>>>> COMMENT ''
>>>>>>>> WITH (
>>>>>>>> 'connector' = 'kafka',
>>>>>>>> 'format' = 'csv',
>>>>>>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092'
>>>>>>>> ,
>>>>>>>> 'properties.group.id' = 'flinkSQL',
>>>>>>>> 'topic' = 'customers',
>>>>>>>> 'csv.field-delimiter' = ';',
>>>>>>>> 'scan.startup.mode' = 'earliest-offset'
>>>>>>>> );
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -----------------------------------
>>>>>>>> -- Add metadata
>>>>>>>> -----------------------------------
>>>>>>>> CREATE TEMPORARY VIEW metadata AS
>>>>>>>> SELECT *
>>>>>>>> , sha256(cast(client_number as STRING)) AS customer_pk
>>>>>>>> , current_timestamp AS load_date
>>>>>>>> , 'Kafka topic: customers' AS record_source
>>>>>>>> FROM customers;
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -----------------------------------
>>>>>>>> -- Deduplicate addresses
>>>>>>>> -----------------------------------
>>>>>>>> CREATE TEMPORARY VIEW dedup_address as
>>>>>>>> SELECT customer_pk
>>>>>>>> , client_number
>>>>>>>> , load_date
>>>>>>>> , address
>>>>>>>> FROM (
>>>>>>>> SELECT customer_pk
>>>>>>>> , client_number
>>>>>>>> , load_date
>>>>>>>> , record_source
>>>>>>>> , address
>>>>>>>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number,
>>>>>>>> address ORDER BY load_date ASC) AS rownum
>>>>>>>> FROM metadata
>>>>>>>> ) where rownum = 1;
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -----------------------------------
>>>>>>>> -- Send to sat_customers_address kafka topic
>>>>>>>> -----------------------------------
>>>>>>>> CREATE TEMPORARY TABLE sat_customers_address (
>>>>>>>> `customer_pk` VARCHAR(64),
>>>>>>>> `client_number` INT,
>>>>>>>> `address` VARCHAR(100)
>>>>>>>> )
>>>>>>>> COMMENT ''
>>>>>>>> WITH (
>>>>>>>> 'connector' = 'kafka',
>>>>>>>> 'format' = 'csv',
>>>>>>>> 'properties.bootstrap.servers' =
>>>>>>>> 'kafka-0.kafka-headless.vvp.svc.cluster.local:9092',
>>>>>>>> 'properties.group.id' = 'flinkSQL',
>>>>>>>> 'topic' = 'sat_customers_address'
>>>>>>>> );
>>>>>>>>
>>>>>>>> INSERT INTO sat_customers_address
>>>>>>>> SELECT customer_pk
>>>>>>>> , client_number
>>>>>>>> , address
>>>>>>>> FROM dedup_address;
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> *Laurent Exsteens*
>>>>>>>> Data Engineer
>>>>>>>> (M) +32 (0) 486 20 48 36
>>>>>>>>
>>>>>>>> *EURA NOVA*
>>>>>>>>
>>>>>>>> Rue Emile Francqui, 4
>>>>>>>>
>>>>>>>> 1435 Mont-Saint-Guibert
>>>>>>>>
>>>>>>>> (T) +32 10 75 02 00
>>>>>>>>
>>>>>>>> *euranova.eu <http://euranova.eu/>*
>>>>>>>>
>>>>>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>>>>>
>>>>>>>> ♻ Be green, keep it on the screen
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> *Laurent Exsteens*
>>>>>> Data Engineer
>>>>>> (M) +32 (0) 486 20 48 36
>>>>>>
>>>>>> *EURA NOVA*
>>>>>>
>>>>>> Rue Emile Francqui, 4
>>>>>>
>>>>>> 1435 Mont-Saint-Guibert
>>>>>>
>>>>>> (T) +32 10 75 02 00
>>>>>>
>>>>>> *euranova.eu <http://euranova.eu/>*
>>>>>>
>>>>>> *research.euranova.eu* <http://research.euranova.eu/>
>>>>>>
>>>>>> ♻ Be green, keep it on the screen
>>>>>
>>>>>
>>>> ♻ Be green, keep it on the screen
>>>
>>>
>>
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>>
>> *EURA NOVA*
>>
>> Rue Emile Francqui, 4
>>
>> 1435 Mont-Saint-Guibert
>>
>> (T) +32 10 75 02 00
>>
>> *euranova.eu <http://euranova.eu/>*
>>
>> *research.euranova.eu* <http://research.euranova.eu/>
>>
>> ♻ Be green, keep it on the screen
>
>

-- 
*Laurent Exsteens*
Data Engineer
(M) +32 (0) 486 20 48 36

*EURA NOVA*

Rue Emile Francqui, 4

1435 Mont-Saint-Guibert

(T) +32 10 75 02 00

*euranova.eu <http://euranova.eu/>*

*research.euranova.eu* <http://research.euranova.eu/>

-- 
♻ Be green, keep it on the screen

Reply via email to