UPDATE_BEFORE is required in cases such as Aggregation with Filter. For
example:

SELECT *
FROM (
  SELECT word, count(*) as cnt
  FROM T
  GROUP BY word
) WHERE cnt < 3;

There is more discussion in this issue:
https://issues.apache.org/jira/browse/FLINK-9528

Best,
Jark

On Mon, 28 Jun 2021 at 13:52, Kai Fu <zzfu...@gmail.com> wrote:

> Hi team,
>
> We notice the UPDATE_BEFORE row kind is not dropped and treated as DELETE
> as in code
> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L81-L84>.
> We're aware that this is useful to retract output records in some cases,
> but we cannot come up with such a scenario, could anyone name a few cases
> for it.
>
> The other thing we want to do is drop the UPDATE_BEFORE row kind in the ES
> connector to reduce the sink traffic since almost all of our records are
> update. In our case, the records are generated by joining with a couple of
> upsert-kafka data sources. Only primary-key participants in the join
> condition for all join cases, with some granularity/cardinality fan-out in
> the middle. We want to know whether it impacts the final result correctness
> if we drop the records with UPDATE_BEFORE row kind.
>
> --
> *Best wishes,*
> *- Kai*
>

Reply via email to