Yes.

There is also a Flink Forward session [1] (since 14:00) talked about
the internals of the underlying changelog mechanism with a visual example.

Best,
Jark

[1]:
https://www.youtube.com/watch?v=KDD8e4GE12w&list=PLDX4T_cnKjD0ngnBSU-bYGfgVv17MiwA7&index=48&t=820s

On Thu, 5 Nov 2020 at 15:48, Henry Dai <dhythe...@gmail.com> wrote:

> Hi Jark,
>
> Thanks for your reply, it helps me a lot!
>
> I have tested the mini-batch optimization, the result shows it reduces a
> lot of records produced when a Flink Table is converted to a Retracted
> DataStream.
>
> It seems I got wrong understanding about Flink's "Dynamic Table" concept
> in the past: if a record R1 is coming to sql computation, before it's
> processed, the Result Table's data view is V1, and after R1 is processed,
> Result Table's data view turn to V2. I used to believe the ChangeLog is
> simply Diff(V2, V1).
>
> Actually, there are a lot of intermediate changes during processing R1.
>
> Thanks!
>
>
>
> Jark Wu <imj...@gmail.com> 于2020年11月5日周四 上午11:36写道:
>
>> Thanks Henry for the detailed example,
>>
>> I will explain why so many records at time 5.
>> That is because the retraction mechanism is per-record triggered in Flink
>> SQL, so there is record amplification in your case.
>> At time 5, the LAST_VALUE aggregation for stream a will first emit -(1,
>> 12345, 0) and then +(1, 12345, 0).
>> When the -(1, 12345, 0) arrives at the join operator, it will join the
>> previous 3 records in stream b, and then send 3 retraction messages.
>> When the 3 retraction messages arrive at the sum aggregation, it
>> produces (F 33)(T 21)(F 21)(T 10)(F 10).
>> In contrast, when the +(1, 12345, 0) arrives the join operator, it sends
>> 3 joined accumulation messages to sum aggregation, and produces (T 12)(F
>> 12)(T 23)(F 23)(T 33) .
>>
>> In Flink SQL, the mini-batch [1] optimization can reduce
>> this amplification, because it is triggered in a min-batch of records.
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>
>>
>> On Wed, 4 Nov 2020 at 23:01, Henry Dai <dhythe...@gmail.com> wrote:
>>
>>>
>>> Dear flink developers&users
>>>
>>>     I have a question about flink sql, It gives me a lot of
>>> trouble, Thank you very much for some help.
>>>
>>>     Lets's assume we have two data stream, `order` and `order_detail`,
>>> they are from mysql binlog.
>>>
>>>     Table `order` schema:
>>>     id              int     primary key
>>>     order_id    int
>>>     status        int
>>>
>>>     Table `order_detail` schema:
>>>     id               int     primary key
>>>     order_id     int
>>>     quantity      int
>>>
>>>     order : order_detail = 1:N, they are joined by `order_id`
>>>
>>>     think we have following data sequence, and we compute sum(quantity)
>>> group by order.oreder_id after they are joined
>>>
>>> time         order                            order__detail
>>>         result
>>>         id  order_id    status            id  order_id    quantity
>>> 1       1   12345       0
>>> 2                                                 1   12345       10
>>>                (T 10)
>>> 3                                                 2   12345       11
>>>                (F 10)(T 21)
>>> 4                                                 3   12345       12
>>>                (F 21)(T 33)
>>> 5       1   12345       1
>>>                 (F 33)(T 21)(F 21)(T 10)(F 10)(T 12)(F 12)(T 23)(F 23)(T
>>> 33)
>>>
>>>     Code:
>>>     tableEnv.registerTableSource("a", new Order());
>>>     tableEnv.registerTableSource("b", new OrderDetail());
>>>     Table tbl1 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
>>> order_id, LAST_VALUE(status) AS status FROM a GROUP BY id");
>>>     tableEnv.registerTable("ax", tbl1);
>>>     Table tbl2 = tableEnv.sqlQuery("SELECT id, LAST_VALUE(order_id) AS
>>> order_id, LAST_VALUE(quantity) AS quantity FROM b GROUP BY id");
>>>     tableEnv.registerTable("bx", tbl2);
>>>     Table table = tableEnv.sqlQuery("SELECT ax.order_id,
>>> SUM(bx.quantity) FROM ax  JOIN bx ON ax.order_id = bx.order_id GROUP BY
>>> ax.order_id");
>>>     DataStream<Tuple2<Boolean, Row>> stream =
>>> tableEnv.toRetractStream(table, Row.class);
>>>     stream.print();
>>>
>>>     Result:
>>>     (true,12345,10)
>>>     (false,12345,10)
>>>     (true,12345,21)
>>>     (false,12345,21)
>>>     (true,12345,33)
>>>     (false,12345,33)
>>>     (true,12345,21)
>>>     (false,12345,21)
>>>     (true,12345,10)
>>>     (false,12345,10)
>>>     (true,12345,12)
>>>     (false,12345,12)
>>>     (true,12345,23)
>>>     (false,12345,23)
>>>     (true,12345,33)
>>>
>>>
>>>     I cann't understand why flink emit so many records at time 5?
>>>
>>>     In production, we consume binlog stream from kafka, convert stream
>>> to flink table, after sql computation, convert result table to flink stream
>>> where we only
>>>     preserve TRUE message in retract stream, and emit them to downstream
>>> kafka.
>>>
>>>     Do we have some method to realize flink dynamic table really (I
>>> mean, trigger computation only once), when we receive (1,12345,1) from
>>> `order`, only emit (F 33)(T 33).
>>>
>>> --
>>> best wishes
>>> hengyu
>>>
>>
>
> --
> best wishes
> hengyu
>

Reply via email to