Hi everyone, I noticed some unexpected behavior with Upsert changelogs in Flink 1.17.1 and I wanted to post here to see if anyone has encountered a similar issue. I’m running a Flink application which performs SQL queries using the Flink SQL and Table APIs, then I convert the resulting table to a DataStream. The SQL statement I am running contains group by/aggregation, so when I convert from a Flink Table to a DataStream, it must be a changelog stream. The changelog mode I am using is “upsert”, so I expect all “UPDATE_BEFORE” rows to be dropped. However, in the case where my SQL query contains a HAVING clause, I noticed that “UPDATE_BEFORE” rows are not dropped.
I’m thinking that this may be a bug, can somebody with more expertise confirm if this is a bug or if I’m misunderstanding upsert changelogs? Query I am trying to run: SELECT orderID, count(order_time) AS c FROM orders GROUP BY orderID HAVING count(order_time)=1; Input: +----+----------------------+--------------------------------+ | op | order_time | orderID | +----+----------------------+--------------------------------+ | +I | 1000 | Order_1 | | +I | 2000 | Order_1 | | +I | 3000 | Order_1 | | +I | 4000 | Order_1 | | +I | 5000 | Order_1 | | +I | 6000 | Order_1 | +----+----------------------+--------------------------------+ Output: 6> +I[Order_1, 1] 6> -U[Order_1, 1] Here is the link to a github repository that contains an example. ( https://github.com/charles-tan/flink-upsert-changelog-bug) Thanks, Charles