Hi Rex,

There is a similar question asked recently which I think is the same reason
[1] called retraction amplification.
You can try to turn on the mini-batch optimization to reduce the retraction
amplification.

Best,
Jark

[1]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/A-question-about-flink-sql-retreact-stream-td39216.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation

On Fri, 6 Nov 2020 at 03:56, Rex Fenley <r...@remind101.com> wrote:

> Also, just to be clear our ES connector looks like this:
>
> CREATE TABLE sink_es_groups (
> id BIGINT,
> //.. a bunch of scalar fields
> array_of_ids ARRAY<BIGINT NOT NULL>,
> PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'elasticsearch-7',
> 'hosts' = '${env:ELASTICSEARCH_HOSTS}',
> 'index' = '${env:GROUPS_ES_INDEX}',
> 'format' = 'json',
> 'sink.bulk-flush.max-actions' = '512',
> 'sink.bulk-flush.max-size' = '1mb',
> 'sink.bulk-flush.interval' = '5000',
> 'sink.bulk-flush.backoff.delay' = '1000',
> 'sink.bulk-flush.backoff.max-retries' = '4',
> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
> )
>
>
> On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <r...@remind101.com> wrote:
>
>> Hello,
>>
>> I'm using the Table API to do a bunch of stateful transformations on CDC
>> Debezium rows and then insert final documents into Elasticsearch via the ES
>> connector.
>>
>> I've noticed that Elasticsearch is constantly deleting and then inserting
>> documents as they update. Ideally, there would be no delete operation for a
>> row update, only for a delete. I'm using the Elasticsearch 7 SQL connector,
>> which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood,
>> which implies upserts are actually what it's capable of.
>>
>> Therefore, I think it's possibly my table plan that's causing row upserts
>> to turn into deletes + inserts. My plan is essentially a series of Joins
>> and GroupBys + UDF Aggregates (aggregating arrays of data). I think,
>> possibly the UDF Aggs following the Joins + GroupBys are causing the
>> upserts to split into delete + inserts somehow. If this is correct, is it
>> possible to make UDFs that preserve Upserts? Or am I totally off-base with
>> my assumptions?
>>
>> Thanks!
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Reply via email to