Thank you for the reply, Jark. In our case, we found that there are no
UPDATE_BEFORE records generated since the join is using -D/+I row kinds.
*> Note: "+I" represents "INSERT", "-D" represents "DELETE", "+U"
represents "UPDATE_AFTER",*
* "-U" represents "UPDATE_BEFORE". We forward input RowK
UPDATE_BEFORE is required in cases such as Aggregation with Filter. For
example:
SELECT *
FROM (
SELECT word, count(*) as cnt
FROM T
GROUP BY word
) WHERE cnt < 3;
There is more discussion in this issue:
https://issues.apache.org/jira/browse/FLINK-9528
Best,
Jark
On Mon, 28 Jun 2021 at 13