Hi Rex, There is a similar question asked recently which I think is the same reason [1] called retraction amplification. You can try to turn on the mini-batch optimization to reduce the retraction amplification.
Best, Jark [1]: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html [2]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation On Fri, 6 Nov 2020 at 03:56, Rex Fenley <r...@remind101.com> wrote: > Also, just to be clear our ES connector looks like this: > > CREATE TABLE sink_es_groups ( > id BIGINT, > //.. a bunch of scalar fields > array_of_ids ARRAY<BIGINT NOT NULL>, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = '${env:ELASTICSEARCH_HOSTS}', > 'index' = '${env:GROUPS_ES_INDEX}', > 'format' = 'json', > 'sink.bulk-flush.max-actions' = '512', > 'sink.bulk-flush.max-size' = '1mb', > 'sink.bulk-flush.interval' = '5000', > 'sink.bulk-flush.backoff.delay' = '1000', > 'sink.bulk-flush.backoff.max-retries' = '4', > 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' > ) > > > On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <r...@remind101.com> wrote: > >> Hello, >> >> I'm using the Table API to do a bunch of stateful transformations on CDC >> Debezium rows and then insert final documents into Elasticsearch via the ES >> connector. >> >> I've noticed that Elasticsearch is constantly deleting and then inserting >> documents as they update. Ideally, there would be no delete operation for a >> row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, >> which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, >> which implies upserts are actually what it's capable of. >> >> Therefore, I think it's possibly my table plan that's causing row upserts >> to turn into deletes + inserts. My plan is essentially a series of Joins >> and GroupBys + UDF Aggregates (aggregating arrays of data). I think, >> possibly the UDF Aggs following the Joins + GroupBys are causing the >> upserts to split into delete + inserts somehow. If this is correct, is it >> possible to make UDFs that preserve Upserts? Or am I totally off-base with >> my assumptions? >> >> Thanks! >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >