Hi Hemi You can not just filter the delete records.
You must use the following syntax to generate a delete record. ``` CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx, deleted boolean) with (.....); INSERT INTO es_sink SELECT f1, f2, f3 FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY f1, f2 ORDER BY proctime()) as rnk from test_source ) where rnk = 1 AND deleted = false; ``` Best, Feng On Sat, Oct 21, 2023 at 11:32 PM Hemi Grs <hemi...@gmail.com> wrote: > Hi Feng, > > thanks for the reply. I actually already tried that because we have a > deletedAt column (if the record is deleted then it will update with the > timestamp to that column). > > What I've tried is as follows: > > insert into es_sink_table > select * from table_source where deletedAt is null > > what it actually do is just not updating any more changes to the elastic > because it's not queried but it't not actually deleting the documents in > elastic. > What I want to achieve is to actually delete the record/document in > elastic ... > > What I'm thinking right now is actually just doing a job every few minutes > to delete all records where the deletedAt is not null in elastic. But this > approach is not elegant at all :) That's why I was hoping there'll be a > better method to do this ... > > > > On Sat, Oct 21, 2023 at 5:21 PM Feng Jin <jinfeng1...@gmail.com> wrote: > >> Hi Hemi, >> >> One possible way, but it may generate many useless states. >> >> As shown below: >> ``` >> CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx, deleted boolean) with >> (.....); >> >> INSERT INTO es_sink >> SELECT f1, f2, f3 >> FROM ( >> SELECT *, >> ROW_NUMBER() OVER (PARTITION BY f1, f2 ORDER BY proctime()) as rnk >> from test_source >> ) where rnk = 1 AND deleted = false; >> ``` >> >> Best, >> Feng >> >> On Fri, Oct 20, 2023 at 1:38 PM Hemi Grs <hemi...@gmail.com> wrote: >> >>> hello everyone, >>> >>> right now I'm using flink to sync from mysql to elasticsearch and so far >>> so good. If we insert, update, or delete it will sync from mysql to elastic >>> without any problem. >>> >>> The problem I have right now is the application is not actually doing >>> hard delete to the records in mysql, but doing soft delete (updating a >>> deletedAt column). >>> >>> Because it's not actually doing a deletion, flink is not deleting the >>> data in elastic. How do I make it so it will delete the data in elastic? >>> >>> Thanks >>> >>