Hi Hemi

You can not just filter the delete records.

You must use the following syntax to generate a delete record.

```
CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx,  deleted boolean) with
(.....);

INSERT INTO es_sink
SELECT f1, f2, f3
FROM (
    SELECT *,
ROW_NUMBER() OVER (PARTITION BY f1, f2   ORDER BY proctime())  as rnk
from test_source
) where rnk = 1 AND deleted = false;
```


Best,
Feng


On Sat, Oct 21, 2023 at 11:32 PM Hemi Grs <hemi...@gmail.com> wrote:

> Hi Feng,
>
> thanks for the reply. I actually already tried that because we have a
> deletedAt column (if the record is deleted then it will update with the
> timestamp to that column).
>
> What I've tried is as follows:
>
> insert into es_sink_table
> select * from table_source where deletedAt is null
>
> what it actually do is just not updating any more changes to the elastic
> because it's not queried but it't not actually deleting the documents in
> elastic.
> What I want to achieve is to actually delete the record/document in
> elastic ...
>
> What I'm thinking right now is actually just doing a job every few minutes
> to delete all records where the deletedAt is not null in elastic. But this
> approach is not elegant at all :) That's why I was hoping there'll be a
> better method to do this ...
>
>
>
> On Sat, Oct 21, 2023 at 5:21 PM Feng Jin <jinfeng1...@gmail.com> wrote:
>
>> Hi Hemi,
>>
>> One possible way, but it may generate many useless states.
>>
>> As shown below:
>> ```
>> CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx,  deleted boolean) with
>> (.....);
>>
>> INSERT INTO es_sink
>> SELECT f1, f2, f3
>> FROM (
>>     SELECT *,
>> ROW_NUMBER() OVER (PARTITION BY f1, f2   ORDER BY proctime())  as rnk
>> from test_source
>> ) where rnk = 1 AND deleted = false;
>> ```
>>
>> Best,
>> Feng
>>
>> On Fri, Oct 20, 2023 at 1:38 PM Hemi Grs <hemi...@gmail.com> wrote:
>>
>>> hello everyone,
>>>
>>> right now I'm using flink to sync from mysql to elasticsearch and so far
>>> so good. If we insert, update, or delete it will sync from mysql to elastic
>>> without any problem.
>>>
>>> The problem I have right now is the application is not actually doing
>>> hard delete to the records in mysql, but doing soft delete (updating a
>>> deletedAt column).
>>>
>>> Because it's not actually doing a deletion, flink is not deleting the
>>> data in elastic. How do I make it so it will delete the data in elastic?
>>>
>>> Thanks
>>>
>>

Reply via email to