Re: Upsert UDFs

2020-11-18 Thread Rex Fenley
Wow, that sounds definitively better. I'll try porting our aggregates over to using `emitUpdateWithRetract` then. I'm assuming the Elasticsearch SQL connector will respond appropriately. Thanks for the help! On Wed, Nov 18, 2020 at 7:20 AM Jark Wu wrote: > Hi Rex, > > Sorry for the late respons

Re: Upsert UDFs

2020-11-18 Thread Jark Wu
Hi Rex, Sorry for the late response. Under the hood, if the UDTAF only implements `emitValue`, then the framework will call `emitValue` for every input record. Assuming this is a TopN UDTAF, the implementation of `emitValue` returns set [A, B, C] for input1 and returns set [A, B, D] for input2, t

Re: Upsert UDFs

2020-11-17 Thread Rex Fenley
Hi, Does this seem like it would help? Thanks! On Tue, Nov 10, 2020 at 10:38 PM Rex Fenley wrote: > Thanks! We did give that a shot and ran into the bug that I reported here > https://issues.apache.org/jira/browse/FLINK-20036 . > > I'm also seeing this function > > public void emitUpdateWith

Re: Upsert UDFs

2020-11-10 Thread Rex Fenley
Thanks! We did give that a shot and ran into the bug that I reported here https://issues.apache.org/jira/browse/FLINK-20036 . I'm also seeing this function public void emitUpdateWithRetract(ACC accumulator, RetractableCollector out); // OPTIONAL and it says it's more performant in some cases v

Re: Upsert UDFs

2020-11-08 Thread Jark Wu
Hi Rex, There is a similar question asked recently which I think is the same reason [1] called retraction amplification. You can try to turn on the mini-batch optimization to reduce the retraction amplification. Best, Jark [1]: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Upsert UDFs

2020-11-05 Thread Rex Fenley
Also, just to be clear our ES connector looks like this: CREATE TABLE sink_es_groups ( id BIGINT, //.. a bunch of scalar fields array_of_ids ARRAY, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '${env:ELASTICSEARCH_HOSTS}', 'index' = '${env:GROUPS_ES_INDEX}', 'f

Upsert UDFs

2020-11-05 Thread Rex Fenley
Hello, I'm using the Table API to do a bunch of stateful transformations on CDC Debezium rows and then insert final documents into Elasticsearch via the ES connector. I've noticed that Elasticsearch is constantly deleting and then inserting documents as they update. Ideally, there would be no del