UPDATE_BEFORE is required in cases such as Aggregation with Filter. For example:
SELECT * FROM ( SELECT word, count(*) as cnt FROM T GROUP BY word ) WHERE cnt < 3; There is more discussion in this issue: https://issues.apache.org/jira/browse/FLINK-9528 Best, Jark On Mon, 28 Jun 2021 at 13:52, Kai Fu <zzfu...@gmail.com> wrote: > Hi team, > > We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE > as in code > <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L81-L84>. > We're aware that this is useful to retract output records in some cases, > but we cannot come up with such a scenario, could anyone name a few cases > for it. > > The other thing we want to do is drop the UPDATE_BEFORE row kind in the ES > connector to reduce the sink traffic since almost all of our records are > update. In our case, the records are generated by joining with a couple of > upsert-kafka data sources. Only primary-key participants in the join > condition for all join cases, with some granularity/cardinality fan-out in > the middle. We want to know whether it impacts the final result correctness > if we drop the records with UPDATE_BEFORE row kind. > > -- > *Best wishes,* > *- Kai* >