Thank you for the reply, Jark. In our case, we found that there are no UPDATE_BEFORE records generated since the join is using -D/+I row kinds.
*> Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U" represents "UPDATE_AFTER",* * "-U" represents "UPDATE_BEFORE". We forward input RowKind if it is inner join, otherwise, we always send insert and delete for simplification. We can optimize this to send -U & +U instead of D & I in the future (see FLINK-17337). They are equivalent in this join case. It may need some refactoring if we want to send -U & +U, so we still keep -D & +I for now for simplification.* On Mon, Jun 28, 2021 at 2:21 PM Jark Wu <imj...@gmail.com> wrote: > UPDATE_BEFORE is required in cases such as Aggregation with Filter. For > example: > > SELECT * > FROM ( > SELECT word, count(*) as cnt > FROM T > GROUP BY word > ) WHERE cnt < 3; > > There is more discussion in this issue: > https://issues.apache.org/jira/browse/FLINK-9528 > > Best, > Jark > > On Mon, 28 Jun 2021 at 13:52, Kai Fu <zzfu...@gmail.com> wrote: > >> Hi team, >> >> We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE >> as in code >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L81-L84>. >> We're aware that this is useful to retract output records in some cases, >> but we cannot come up with such a scenario, could anyone name a few cases >> for it. >> >> The other thing we want to do is drop the UPDATE_BEFORE row kind in the >> ES connector to reduce the sink traffic since almost all of our records are >> update. In our case, the records are generated by joining with a couple of >> upsert-kafka data sources. Only primary-key participants in the join >> condition for all join cases, with some granularity/cardinality fan-out in >> the middle. We want to know whether it impacts the final result correctness >> if we drop the records with UPDATE_BEFORE row kind. >> >> -- >> *Best wishes,* >> *- Kai* >> > -- *Best wishes,* *- Kai*