Thanks Henry for the detailed example, I will explain why so many records at time 5. That is because the retraction mechanism is per-record triggered in Flink SQL, so there is record amplification in your case. At time 5, the LAST_VALUE aggregation for stream a will first emit -(1, 12345, 0) and then +(1, 12345, 0). When the -(1, 12345, 0) arrives at the join operator, it will join the previous 3 records in stream b, and then send 3 retraction messages. When the 3 retraction messages arrive at the sum aggregation, it produces (F 33)(T 21)(F 21)(T 10)(F 10). In contrast, when the +(1, 12345, 0) arrives the join operator, it sends 3 joined accumulation messages to sum aggregation, and produces (T 12)(F 12)(T 23)(F 23)(T 33) .
In Flink SQL, the mini-batch [1] optimization can reduce this amplification, because it is triggered in a min-batch of records. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation On Wed, 4 Nov 2020 at 23:01, Henry Dai <dhythe...@gmail.com> wrote: > > Dear flink developers&users > > I have a question about flink sql, It gives me a lot of trouble, Thank > you very much for some help. > > Lets's assume we have two data stream, `order` and `order_detail`, > they are from mysql binlog. > > Table `order` schema: > id int primary key > order_id int > status int > > Table `order_detail` schema: > id int primary key > order_id int > quantity int > > order : order_detail = 1:N, they are joined by `order_id` > > think we have following data sequence, and we compute sum(quantity) > group by order.oreder_id after they are joined > > time order order__detail > result > id order_id status id order_id quantity > 1 1 12345 0 > 2 1 12345 10 > (T 10) > 3 2 12345 11 > (F 10)(T 21) > 4 3 12345 12 > (F 21)(T 33) > 5 1 12345 1 > (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T 33) > > > Code: > tableEnv.registerTableSource("a", new Order()); > tableEnv.registerTableSource("b", new OrderDetail()); > Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS > order_id, LAST_VALUE(status) AS status FROM a GROUP BY id"); > tableEnv.registerTable("ax", tbl1); > Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS > order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id"); > tableEnv.registerTable("bx", tbl2); > Table table = tableEnv.sqlQuery("SELECT ax.order_id, SUM(bx.quantity) > FROM ax JOIN bx ON ax.order_id = bx.order_id GROUP BY ax.order_id"); > DataStream<Tuple2<Boolean, Row>> stream = > tableEnv.toRetractStream(table, Row.class); > stream.print(); > > Result: > (true,12345,10) > (false,12345,10) > (true,12345,21) > (false,12345,21) > (true,12345,33) > (false,12345,33) > (true,12345,21) > (false,12345,21) > (true,12345,10) > (false,12345,10) > (true,12345,12) > (false,12345,12) > (true,12345,23) > (false,12345,23) > (true,12345,33) > > > I cann't understand why flink emit so many records at time 5? > > In production, we consume binlog stream from kafka, convert stream to > flink table, after sql computation, convert result table to flink stream > where we only > preserve TRUE message in retract stream, and emit them to downstream > kafka. > > Do we have some method to realize flink dynamic table really (I mean, > trigger computation only once), when we receive (1,12345,1) from `order`, > only emit (F 33)(T 33). > > -- > best wishes > hengyu >