Wow, that sounds definitively better. I'll try porting our aggregates over to using `emitUpdateWithRetract` then. I'm assuming the Elasticsearch SQL connector will respond appropriately.
Thanks for the help! On Wed, Nov 18, 2020 at 7:20 AM Jark Wu <imj...@gmail.com> wrote: > Hi Rex, > > Sorry for the late response. > > Under the hood, if the UDTAF only implements `emitValue`, then the > framework will call `emitValue` for every input record. Assuming this is > a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for > input1 > and returns set [A, B, D] for input2, then the framework will send -A, -B, > -C, +A, +B, +D after processing input2. > > But if the TopN UDTAF implements the `emitUpdateWithRetract`, the UDTAF > can just send -C and +D in `emitUpdateWithRetract`, > because the TopN known which row is updated. So it can "reduce the number > of retracts". > > Best, > Jark > > On Wed, 18 Nov 2020 at 14:32, Rex Fenley <r...@remind101.com> wrote: > >> Hi, >> >> Does this seem like it would help? >> >> Thanks! >> >> On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Thanks! We did give that a shot and ran into the bug that I reported >>> here https://issues.apache.org/jira/browse/FLINK-20036 . >>> >>> I'm also seeing this function >>> >>> public void emitUpdateWithRetract(ACC accumulator, >>> RetractableCollector<T> out); // OPTIONAL >>> >>> and it says it's more performant in some cases vs >>> >>> public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL >>> >>> . I'm having some trouble understanding in which cases it benefits >>> performance and if it would help our case. Would using >>> `emitUpdateWithRetract` instead of `emitValue` reduce the number of >>> retracts we're seeing yet preserve the same end results, where our >>> Elasticsearch documents stay up to date? >>> >>> On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <imj...@gmail.com> wrote: >>> >>>> Hi Rex, >>>> >>>> There is a similar question asked recently which I think is the same >>>> reason [1] called retraction amplification. >>>> You can try to turn on the mini-batch optimization to reduce the >>>> retraction amplification. >>>> >>>> Best, >>>> Jark >>>> >>>> [1]: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html >>>> [2]: >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation >>>> >>>> On Fri, 6 Nov 2020 at 03:56, Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Also, just to be clear our ES connector looks like this: >>>>> >>>>> CREATE TABLE sink_es_groups ( >>>>> id BIGINT, >>>>> //.. a bunch of scalar fields >>>>> array_of_ids ARRAY<BIGINT NOT NULL>, >>>>> PRIMARY KEY (id) NOT ENFORCED >>>>> ) WITH ( >>>>> 'connector' = 'elasticsearch-7', >>>>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}', >>>>> 'index' = '${env:GROUPS_ES_INDEX}', >>>>> 'format' = 'json', >>>>> 'sink.bulk-flush.max-actions' = '512', >>>>> 'sink.bulk-flush.max-size' = '1mb', >>>>> 'sink.bulk-flush.interval' = '5000', >>>>> 'sink.bulk-flush.backoff.delay' = '1000', >>>>> 'sink.bulk-flush.backoff.max-retries' = '4', >>>>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' >>>>> ) >>>>> >>>>> >>>>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <r...@remind101.com> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I'm using the Table API to do a bunch of stateful transformations on >>>>>> CDC Debezium rows and then insert final documents into Elasticsearch via >>>>>> the ES connector. >>>>>> >>>>>> I've noticed that Elasticsearch is constantly deleting and then >>>>>> inserting documents as they update. Ideally, there would be no delete >>>>>> operation for a row update, only for a delete. I'm using the >>>>>> Elasticsearch >>>>>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` >>>>>> under the hood, which implies upserts are actually what it's capable of. >>>>>> >>>>>> Therefore, I think it's possibly my table plan that's causing row >>>>>> upserts to turn into deletes + inserts. My plan is essentially a series >>>>>> of >>>>>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I >>>>>> think, >>>>>> possibly the UDF Aggs following the Joins + GroupBys are causing the >>>>>> upserts to split into delete + inserts somehow. If this is correct, is it >>>>>> possible to make UDFs that preserve Upserts? Or am I totally off-base >>>>>> with >>>>>> my assumptions? >>>>>> >>>>>> Thanks! >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>