Yes. There is also a Flink Forward session [1] (since 14:00) talked about the internals of the underlying changelog mechanism with a visual example.
Best, Jark [1]: https://www.youtube.com/watch?v=KDD8e4GE12w&list=PLDX4T_cnKjD0ngnBSU-bYGfgVv17MiwA7&index=48&t=820s On Thu, 5 Nov 2020 at 15:48, Henry Dai <dhythe...@gmail.com> wrote: > Hi Jark, > > Thanks for your reply, it helps me a lot! > > I have tested the mini-batch optimization, the result shows it reduces a > lot of records produced when a Flink Table is converted to a Retracted > DataStream. > > It seems I got wrong understanding about Flink's "Dynamic Table" concept > in the past: if a record R1 is coming to sql computation, before it's > processed, the Result Table's data view is V1, and after R1 is processed, > Result Table's data view turn to V2. I used to believe the ChangeLog is > simply Diff(V2, V1). > > Actually, there are a lot of intermediate changes during processing R1. > > Thanks! > > > > Jark Wu <imj...@gmail.com> 于2020年11月5日周四 上午11:36写道: > >> Thanks Henry for the detailed example, >> >> I will explain why so many records at time 5. >> That is because the retraction mechanism is per-record triggered in Flink >> SQL, so there is record amplification in your case. >> At time 5, the LAST_VALUE aggregation for stream a will first emit -(1, >> 12345, 0) and then +(1, 12345, 0). >> When the -(1, 12345, 0) arrives at the join operator, it will join the >> previous 3 records in stream b, and then send 3 retraction messages. >> When the 3 retraction messages arrive at the sum aggregation, it >> produces (F 33)(T 21)(F 21)(T 10)(F 10). >> In contrast, when the +(1, 12345, 0) arrives the join operator, it sends >> 3 joined accumulation messages to sum aggregation, and produces (T 12)(F >> 12)(T 23)(F 23)(T 33) . >> >> In Flink SQL, the mini-batch [1] optimization can reduce >> this amplification, because it is triggered in a min-batch of records. >> >> Best, >> Jark >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation >> >> >> On Wed, 4 Nov 2020 at 23:01, Henry Dai <dhythe...@gmail.com> wrote: >> >>> >>> Dear flink developers&users >>> >>> I have a question about flink sql, It gives me a lot of >>> trouble, Thank you very much for some help. >>> >>> Lets's assume we have two data stream, `order` and `order_detail`, >>> they are from mysql binlog. >>> >>> Table `order` schema: >>> id int primary key >>> order_id int >>> status int >>> >>> Table `order_detail` schema: >>> id int primary key >>> order_id int >>> quantity int >>> >>> order : order_detail = 1:N, they are joined by `order_id` >>> >>> think we have following data sequence, and we compute sum(quantity) >>> group by order.oreder_id after they are joined >>> >>> time order order__detail >>> result >>> id order_id status id order_id quantity >>> 1 1 12345 0 >>> 2 1 12345 10 >>> (T 10) >>> 3 2 12345 11 >>> (F 10)(T 21) >>> 4 3 12345 12 >>> (F 21)(T 33) >>> 5 1 12345 1 >>> (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T >>> 33) >>> >>> Code: >>> tableEnv.registerTableSource("a", new Order()); >>> tableEnv.registerTableSource("b", new OrderDetail()); >>> Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS >>> order_id, LAST_VALUE(status) AS status FROM a GROUP BY id"); >>> tableEnv.registerTable("ax", tbl1); >>> Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS >>> order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id"); >>> tableEnv.registerTable("bx", tbl2); >>> Table table = tableEnv.sqlQuery("SELECT ax.order_id, >>> SUM(bx.quantity) FROM ax JOIN bx ON ax.order_id = bx.order_id GROUP BY >>> ax.order_id"); >>> DataStream<Tuple2<Boolean, Row>> stream = >>> tableEnv.toRetractStream(table, Row.class); >>> stream.print(); >>> >>> Result: >>> (true,12345,10) >>> (false,12345,10) >>> (true,12345,21) >>> (false,12345,21) >>> (true,12345,33) >>> (false,12345,33) >>> (true,12345,21) >>> (false,12345,21) >>> (true,12345,10) >>> (false,12345,10) >>> (true,12345,12) >>> (false,12345,12) >>> (true,12345,23) >>> (false,12345,23) >>> (true,12345,33) >>> >>> >>> I cann't understand why flink emit so many records at time 5? >>> >>> In production, we consume binlog stream from kafka, convert stream >>> to flink table, after sql computation, convert result table to flink stream >>> where we only >>> preserve TRUE message in retract stream, and emit them to downstream >>> kafka. >>> >>> Do we have some method to realize flink dynamic table really (I >>> mean, trigger computation only once), when we receive (1,12345,1) from >>> `order`, only emit (F 33)(T 33). >>> >>> -- >>> best wishes >>> hengyu >>> >> > > -- > best wishes > hengyu >