Hi Rex, Sorry for the late response.
Under the hood, if the UDTAF only implements `emitValue`, then the framework will call `emitValue` for every input record. Assuming this is a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for input1 and returns set [A, B, D] for input2, then the framework will send -A, -B, -C, +A, +B, +D after processing input2. But if the TopN UDTAF implements the `emitUpdateWithRetract`, the UDTAF can just send -C and +D in `emitUpdateWithRetract`, because the TopN known which row is updated. So it can "reduce the number of retracts". Best, Jark On Wed, 18 Nov 2020 at 14:32, Rex Fenley <r...@remind101.com> wrote: > Hi, > > Does this seem like it would help? > > Thanks! > > On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <r...@remind101.com> wrote: > >> Thanks! We did give that a shot and ran into the bug that I reported here >> https://issues.apache.org/jira/browse/FLINK-20036 . >> >> I'm also seeing this function >> >> public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> >> out); // OPTIONAL >> >> and it says it's more performant in some cases vs >> >> public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL >> >> . I'm having some trouble understanding in which cases it benefits >> performance and if it would help our case. Would using >> `emitUpdateWithRetract` instead of `emitValue` reduce the number of >> retracts we're seeing yet preserve the same end results, where our >> Elasticsearch documents stay up to date? >> >> On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <imj...@gmail.com> wrote: >> >>> Hi Rex, >>> >>> There is a similar question asked recently which I think is the same >>> reason [1] called retraction amplification. >>> You can try to turn on the mini-batch optimization to reduce the >>> retraction amplification. >>> >>> Best, >>> Jark >>> >>> [1]: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html >>> [2]: >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation >>> >>> On Fri, 6 Nov 2020 at 03:56, Rex Fenley <r...@remind101.com> wrote: >>> >>>> Also, just to be clear our ES connector looks like this: >>>> >>>> CREATE TABLE sink_es_groups ( >>>> id BIGINT, >>>> //.. a bunch of scalar fields >>>> array_of_ids ARRAY<BIGINT NOT NULL>, >>>> PRIMARY KEY (id) NOT ENFORCED >>>> ) WITH ( >>>> 'connector' = 'elasticsearch-7', >>>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}', >>>> 'index' = '${env:GROUPS_ES_INDEX}', >>>> 'format' = 'json', >>>> 'sink.bulk-flush.max-actions' = '512', >>>> 'sink.bulk-flush.max-size' = '1mb', >>>> 'sink.bulk-flush.interval' = '5000', >>>> 'sink.bulk-flush.backoff.delay' = '1000', >>>> 'sink.bulk-flush.backoff.max-retries' = '4', >>>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' >>>> ) >>>> >>>> >>>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> I'm using the Table API to do a bunch of stateful transformations on >>>>> CDC Debezium rows and then insert final documents into Elasticsearch via >>>>> the ES connector. >>>>> >>>>> I've noticed that Elasticsearch is constantly deleting and then >>>>> inserting documents as they update. Ideally, there would be no delete >>>>> operation for a row update, only for a delete. I'm using the Elasticsearch >>>>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink` >>>>> under the hood, which implies upserts are actually what it's capable of. >>>>> >>>>> Therefore, I think it's possibly my table plan that's causing row >>>>> upserts to turn into deletes + inserts. My plan is essentially a series of >>>>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I think, >>>>> possibly the UDF Aggs following the Joins + GroupBys are causing the >>>>> upserts to split into delete + inserts somehow. If this is correct, is it >>>>> possible to make UDFs that preserve Upserts? Or am I totally off-base with >>>>> my assumptions? >>>>> >>>>> Thanks! >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >