Hi everyone,

I noticed some unexpected behavior with Upsert changelogs in Flink 1.17.1
and I wanted to post here to see if anyone has encountered a similar issue.
I’m running a Flink application which performs SQL queries using the Flink
SQL and Table APIs, then I convert the resulting table to a DataStream. The
SQL statement I am running contains group by/aggregation, so when I convert
from a Flink Table to a DataStream, it must be a changelog stream. The
changelog mode I am using is “upsert”, so I expect all “UPDATE_BEFORE” rows
to be dropped. However, in the case where my SQL query contains a HAVING
clause, I noticed that “UPDATE_BEFORE” rows are not dropped.

I’m thinking that this may be a bug, can somebody with more expertise
confirm if this is a bug or if I’m misunderstanding upsert changelogs?

Query I am trying to run:
SELECT orderID, count(order_time) AS c FROM orders GROUP BY orderID HAVING
count(order_time)=1;

Input:
+----+----------------------+--------------------------------+
| op |           order_time |                        orderID |
+----+----------------------+--------------------------------+
| +I |                 1000 |                        Order_1 |
| +I |                 2000 |                        Order_1 |
| +I |                 3000 |                        Order_1 |
| +I |                 4000 |                        Order_1 |
| +I |                 5000 |                        Order_1 |
| +I |                 6000 |                        Order_1 |
+----+----------------------+--------------------------------+

Output:
6> +I[Order_1, 1]
6> -U[Order_1, 1]

Here is the link to a github repository that contains an example. (
https://github.com/charles-tan/flink-upsert-changelog-bug)

Thanks,
Charles

Reply via email to