Also, just to be clear our ES connector looks like this: CREATE TABLE sink_es_groups ( id BIGINT, //.. a bunch of scalar fields array_of_ids ARRAY<BIGINT NOT NULL>, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '${env:ELASTICSEARCH_HOSTS}', 'index' = '${env:GROUPS_ES_INDEX}', 'format' = 'json', 'sink.bulk-flush.max-actions' = '512', 'sink.bulk-flush.max-size' = '1mb', 'sink.bulk-flush.interval' = '5000', 'sink.bulk-flush.backoff.delay' = '1000', 'sink.bulk-flush.backoff.max-retries' = '4', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' )
On Thu, Nov 5, 2020 at 11:52 AM Rex Fenley <> wrote: > Hello, > > I'm using the Table API to do a bunch of stateful transformations on CDC > Debezium rows and then insert final documents into Elasticsearch via the ES > connector. > > I've noticed that Elasticsearch is constantly deleting and then inserting > documents as they update. Ideally, there would be no delete operation for a > row update, only for a delete. I'm using the Elasticsearch 7 SQL connector, > which I'm assuming uses `Elasticsearch7UpsertTableSink` under the hood, > which implies upserts are actually what it's capable of. > > Therefore, I think it's possibly my table plan that's causing row upserts > to turn into deletes + inserts. My plan is essentially a series of Joins > and GroupBys + UDF Aggregates (aggregating arrays of data). I think, > possibly the UDF Aggs following the Joins + GroupBys are causing the > upserts to split into delete + inserts somehow. If this is correct, is it > possible to make UDFs that preserve Upserts? Or am I totally off-base with > my assumptions? > > Thanks! > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > <> | BLOG <> | > FOLLOW US <> | LIKE US > <> > -- Rex Fenley | Software Engineer - Mobile and Backend <> | BLOG <> | FOLLOW US <> | LIKE US <>