Wow, that sounds definitively better. I'll try porting our aggregates over
to using `emitUpdateWithRetract` then. I'm assuming the Elasticsearch SQL
connector will respond appropriately.

Thanks for the help!

On Wed, Nov 18, 2020 at 7:20 AM Jark Wu <imj...@gmail.com> wrote:

> Hi Rex,
>
> Sorry for the late response.
>
> Under the hood, if the UDTAF only implements `emitValue`, then the
> framework will call `emitValue` for every input record. Assuming this is
> a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for
> input1
> and returns set [A, B, D] for input2, then the framework will send -A, -B,
> -C, +A, +B, +D after processing input2.
>
> But if the TopN UDTAF implements the `emitUpdateWithRetract`, the UDTAF
> can just send -C and +D in `emitUpdateWithRetract`,
> because the TopN known which row is updated. So it can "reduce the number
> of retracts".
>
> Best,
> Jark
>
> On Wed, 18 Nov 2020 at 14:32, Rex Fenley <r...@remind101.com> wrote:
>
>> Hi,
>>
>> Does this seem like it would help?
>>
>> Thanks!
>>
>> On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley <r...@remind101.com> wrote:
>>
>>> Thanks! We did give that a shot and ran into the bug that I reported
>>> here https://issues.apache.org/jira/browse/FLINK-20036 .
>>>
>>> I'm also seeing this function
>>>
>>>   public void emitUpdateWithRetract(ACC accumulator, 
>>> RetractableCollector<T> out); // OPTIONAL
>>>
>>> and it says it's more performant in some cases vs
>>>
>>>   public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
>>>
>>> . I'm having some trouble understanding in which cases it benefits
>>> performance and if it would help our case. Would using
>>> `emitUpdateWithRetract` instead of `emitValue` reduce the number of
>>> retracts we're seeing yet preserve the same end results, where our
>>> Elasticsearch documents stay up to date?
>>>
>>> On Sun, Nov 8, 2020 at 6:43 PM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> There is a similar question asked recently which I think is the same
>>>> reason [1] called retraction amplification.
>>>> You can try to turn on the mini-batch optimization to reduce the
>>>> retraction amplification.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> [1]:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
>>>> [2]:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>>>
>>>> On Fri, 6 Nov 2020 at 03:56, Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> Also, just to be clear our ES connector looks like this:
>>>>>
>>>>> CREATE TABLE sink_es_groups (
>>>>> id BIGINT,
>>>>> //.. a bunch of scalar fields
>>>>> array_of_ids ARRAY<BIGINT NOT NULL>,
>>>>> PRIMARY KEY (id) NOT ENFORCED
>>>>> ) WITH (
>>>>> 'connector' = 'elasticsearch-7',
>>>>> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
>>>>> 'index' = '${env:GROUPS_ES_INDEX}',
>>>>> 'format' = 'json',
>>>>> 'sink.bulk-flush.max-actions' = '512',
>>>>> 'sink.bulk-flush.max-size' = '1mb',
>>>>> 'sink.bulk-flush.interval' = '5000',
>>>>> 'sink.bulk-flush.backoff.delay' = '1000',
>>>>> 'sink.bulk-flush.backoff.max-retries' = '4',
>>>>> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
>>>>> )
>>>>>
>>>>>
>>>>> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <r...@remind101.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm using the Table API to do a bunch of stateful transformations on
>>>>>> CDC Debezium rows and then insert final documents into Elasticsearch via
>>>>>> the ES connector.
>>>>>>
>>>>>> I've noticed that Elasticsearch is constantly deleting and then
>>>>>> inserting documents as they update. Ideally, there would be no delete
>>>>>> operation for a row update, only for a delete. I'm using the 
>>>>>> Elasticsearch
>>>>>> 7 SQL connector, which I'm assuming uses `Elasticsearch7UpsertTableSink`
>>>>>> under the hood, which implies upserts are actually what it's capable of.
>>>>>>
>>>>>> Therefore, I think it's possibly my table plan that's causing row
>>>>>> upserts to turn into deletes + inserts. My plan is essentially a series 
>>>>>> of
>>>>>> Joins and GroupBys + UDF Aggregates (aggregating arrays of data). I 
>>>>>> think,
>>>>>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>>>>>> upserts to split into delete + inserts somehow. If this is correct, is it
>>>>>> possible to make UDFs that preserve Upserts? Or am I totally off-base 
>>>>>> with
>>>>>> my assumptions?
>>>>>>
>>>>>> Thanks!
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to