Thanks! We did give that a shot and ran into the bug that I reported here https://issues.apache.org/jira/browse/FLINK-20036 .
I'm also seeing this function public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL and it says it's more performant in some cases vs public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL . I'm having some trouble understanding in which cases it benefits performance and if it would help our case. Would using `emitUpdateWithRetract` instead of `emitValue` reduce the number of retracts we're seeing yet preserve the same end results, where our Elasticsearch documents stay up to date? On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <imj...@gmail.com> wrote: > Hi Rex, > > There is a similar question asked recently which I think is the same > reason [1] called retraction amplification. > You can try to turn on the mini-batch optimization to reduce the > retraction amplification. > > Best, > Jark > > [1]: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html > [2]: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation > > On Fri, 6 Nov 2020 at 03:56, Rex Fenley <r...@remind101.com> wrote: > >> Also, just to be clear our ES connector looks like this: >> >> CREATE TABLE sink_es_groups ( >> id BIGINT, >> //.. a bunch of scalar fields >> array_of_ids ARRAY<BIGINT NOT NULL>, >> PRIMARY KEY (id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = '${env:ELASTICSEARCH_HOSTS}', >> 'index' = '${env:GROUPS_ES_INDEX}', >> 'format' = 'json', >> 'sink.bulk-flush.max-actions' = '512', >> 'sink.bulk-flush.max-size' = '1mb', >> 'sink.bulk-flush.interval' = '5000', >> 'sink.bulk-flush.backoff.delay' = '1000', >> 'sink.bulk-flush.backoff.max-retries' = '4', >> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' >> ) >> >> >> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <r...@remind101.com> wrote: >> >>> Hello, >>> >>> I'm using the Table API to do a bunch of stateful transformations on CDC >>> Debezium rows and then insert final documents into Elasticsearch via the ES >>> connector. >>> >>> I've noticed that Elasticsearch is constantly deleting and then >>> inserting documents as they update. Ideally, there would be no delete >>> operation for a row update, only for a delete. I'm using the Elasticsearch >>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` >>> under the hood, which implies upserts are actually what it's capable of. >>> >>> Therefore, I think it's possibly my table plan that's causing row >>> upserts to turn into deletes + inserts. My plan is essentially a series of >>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, >>> possibly the UDF Aggs following the Joins + GroupBys are causing the >>> upserts to split into delete + inserts somehow. If this is correct, is it >>> possible to make UDFs that preserve Upserts? Or am I totally off-base with >>> my assumptions? >>> >>> Thanks! >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>