Hi, Charles

It is not a bug, this is because the primary keys provided by sink are not
exactly match input changeLogUpsertKeys, so fallback to before and after
mode, you can see [1] for more detail.

[1]
https://github.com/apache/flink/blob/d8630cb5db0608a630de95df0dd1d0c9f0b56aa2/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala#L819

Best,
Ron

Charles Tan <ctangu...@gmail.com> 于2023年9月19日周二 03:49写道:

> Hi everyone,
>
> I noticed some unexpected behavior with Upsert changelogs in Flink 1.17.1
> and I wanted to post here to see if anyone has encountered a similar issue.
> I’m running a Flink application which performs SQL queries using the Flink
> SQL and Table APIs, then I convert the resulting table to a DataStream. The
> SQL statement I am running contains group by/aggregation, so when I convert
> from a Flink Table to a DataStream, it must be a changelog stream. The
> changelog mode I am using is “upsert”, so I expect all “UPDATE_BEFORE” rows
> to be dropped. However, in the case where my SQL query contains a HAVING
> clause, I noticed that “UPDATE_BEFORE” rows are not dropped.
>
> I’m thinking that this may be a bug, can somebody with more expertise
> confirm if this is a bug or if I’m misunderstanding upsert changelogs?
>
> Query I am trying to run:
> SELECT orderID, count(order_time) AS c FROM orders GROUP BY orderID HAVING
> count(order_time)=1;
>
> Input:
> +----+----------------------+--------------------------------+
> | op |           order_time |                        orderID |
> +----+----------------------+--------------------------------+
> | +I |                 1000 |                        Order_1 |
> | +I |                 2000 |                        Order_1 |
> | +I |                 3000 |                        Order_1 |
> | +I |                 4000 |                        Order_1 |
> | +I |                 5000 |                        Order_1 |
> | +I |                 6000 |                        Order_1 |
> +----+----------------------+--------------------------------+
>
> Output:
> 6> +I[Order_1, 1]
> 6> -U[Order_1, 1]
>
> Here is the link to a github repository that contains an example. (
> https://github.com/charles-tan/flink-upsert-changelog-bug)
>
> Thanks,
> Charles
>

Reply via email to