Hi, Charles It is not a bug, this is because the primary keys provided by sink are not exactly match input changeLogUpsertKeys, so fallback to before and after mode, you can see [1] for more detail.
[1] https://github.com/apache/flink/blob/d8630cb5db0608a630de95df0dd1d0c9f0b56aa2/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala#L819 Best, Ron Charles Tan <ctangu...@gmail.com> 于2023年9月19日周二 03:49写道: > Hi everyone, > > I noticed some unexpected behavior with Upsert changelogs in Flink 1.17.1 > and I wanted to post here to see if anyone has encountered a similar issue. > I’m running a Flink application which performs SQL queries using the Flink > SQL and Table APIs, then I convert the resulting table to a DataStream. The > SQL statement I am running contains group by/aggregation, so when I convert > from a Flink Table to a DataStream, it must be a changelog stream. The > changelog mode I am using is “upsert”, so I expect all “UPDATE_BEFORE” rows > to be dropped. However, in the case where my SQL query contains a HAVING > clause, I noticed that “UPDATE_BEFORE” rows are not dropped. > > I’m thinking that this may be a bug, can somebody with more expertise > confirm if this is a bug or if I’m misunderstanding upsert changelogs? > > Query I am trying to run: > SELECT orderID, count(order_time) AS c FROM orders GROUP BY orderID HAVING > count(order_time)=1; > > Input: > +----+----------------------+--------------------------------+ > | op | order_time | orderID | > +----+----------------------+--------------------------------+ > | +I | 1000 | Order_1 | > | +I | 2000 | Order_1 | > | +I | 3000 | Order_1 | > | +I | 4000 | Order_1 | > | +I | 5000 | Order_1 | > | +I | 6000 | Order_1 | > +----+----------------------+--------------------------------+ > > Output: > 6> +I[Order_1, 1] > 6> -U[Order_1, 1] > > Here is the link to a github repository that contains an example. ( > https://github.com/charles-tan/flink-upsert-changelog-bug) > > Thanks, > Charles >