Thanks Henry for the detailed example,

I will explain why so many records at time 5.
That is because the retraction mechanism is per-record triggered in Flink
SQL, so there is record amplification in your case.
At time 5, the LAST_VALUE aggregation for stream a will first emit -(1,
12345, 0) and then +(1, 12345, 0).
When the -(1, 12345, 0) arrives at the join operator, it will join the
previous 3 records in stream b, and then send 3 retraction messages.
When the 3 retraction messages arrive at the sum aggregation, it
produces (F 33)(T 21)(F 21)(T 10)(F 10).
In contrast, when the +(1, 12345, 0) arrives the join operator, it sends 3
joined accumulation messages to sum aggregation, and produces (T 12)(F
12)(T 23)(F 23)(T 33) .

In Flink SQL, the mini-batch [1] optimization can reduce
this amplification, because it is triggered in a min-batch of records.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation


On Wed, 4 Nov 2020 at 23:01, Henry Dai <dhythe...@gmail.com> wrote:

>
> Dear flink developers&users
>
>     I have a question about flink sql, It gives me a lot of trouble, Thank
> you very much for some help.
>
>     Lets's assume we have two data stream, `order` and `order_detail`,
> they are from mysql binlog.
>
>     Table `order` schema:
>     id              int     primary key
>     order_id    int
>     status        int
>
>     Table `order_detail` schema:
>     id               int     primary key
>     order_id     int
>     quantity      int
>
>     order : order_detail = 1:N, they are joined by `order_id`
>
>     think we have following data sequence, and we compute sum(quantity)
> group by order.oreder_id after they are joined
>
> time         order                            order__detail
>       result
>         id  order_id    status            id  order_id    quantity
> 1       1   12345       0
> 2                                                 1   12345       10
>              (T 10)
> 3                                                 2   12345       11
>              (F 10)(T 21)
> 4                                                 3   12345       12
>              (F 21)(T 33)
> 5       1   12345       1
>               (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33)
>
>
>     Code:
>     tableEnv.registerTableSource("a", new Order());
>     tableEnv.registerTableSource("b", new OrderDetail());
>     Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
> order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
>     tableEnv.registerTable("ax", tbl1);
>     Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
> order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
>     tableEnv.registerTable("bx", tbl2);
>     Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity)
> FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id");
>     DataStream<Tuple2<Boolean, Row>> stream =
> tableEnv.toRetractStream(table, Row.class);
>     stream.print();
>
>     Result:
>     (true,12345,10)
>     (false,12345,10)
>     (true,12345,21)
>     (false,12345,21)
>     (true,12345,33)
>     (false,12345,33)
>     (true,12345,21)
>     (false,12345,21)
>     (true,12345,10)
>     (false,12345,10)
>     (true,12345,12)
>     (false,12345,12)
>     (true,12345,23)
>     (false,12345,23)
>     (true,12345,33)
>
>
>     I cann't understand why flink emit so many records at time 5?
>
>     In production, we consume binlog stream from kafka, convert stream to
> flink table, after sql computation, convert result table to flink stream
> where we only
>     preserve TRUE message in retract stream, and emit them to downstream
> kafka.
>
>     Do we have some method to realize flink dynamic table really (I mean,
> trigger computation only once), when we receive (1,12345,1) from `order`,
> only emit (F 33)(T 33).
>
> --
> best wishes
> hengyu
>

Reply via email to