Hi lsyldliu,

Thanks for investigating this.

First of all, if you are using mini-batch deduplication, it doesn't support
state ttl in 1.9. That's why the tps looks the same with 1.11 disable state
ttl.
We just introduce state ttl for mini-batch deduplication recently.

Regarding to the performance regression, it looks very surprise to me. The
performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
I don't have much experience of the underlying of StateTtlConfig. So I loop
in @Yu Li <car...@gmail.com> @YunTang in CC who may have more insights on
this.

For more information, we use the following StateTtlConfig [1] in blink
planner:

StateTtlConfig
  .newBuilder(Time.milliseconds(retentionTime))
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .build();


Best,
Jark


[1]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27





On Wed, 29 Apr 2020 at 11:53, 刘大龙 <ld...@zju.edu.cn> wrote:

> Hi, all!
>
> At flink master branch, we have supported state ttl  for sql mini-batch
> deduplication using incremental cleanup strategy on heap backend, refer to
> FLINK-16581. Because I want to test the performance of this feature, so I
> compile master branch code and deploy the jar to production
> environment,then run three types of tests, respectively:
>
>
>
>
> flink 1.9.0 release version enable state ttl
> flink 1.11-snapshot version disable state ttl
> flink 1.11-snapshot version enable state ttl
>
>
>
>
> The test query sql as follows:
>
> select order_date,
>     sum(price * amount - goods_all_fav_amt - virtual_money_amt +
> goods_carriage_amt) as saleP,
>     sum(amount) as saleN,
>     count(distinct parent_sn) as orderN,
>     count(distinct user_id) as cusN
>        from(
>             select order_date, user_id,
>             order_type, order_status, terminal, last_update_time,
> goods_all_fav_amt,
>             goods_carriage_amt, virtual_money_amt, price, amount,
> order_quality, quality_goods_cnt, acture_goods_amt
>             from (select *, row_number() over(partition by order_id,
> order_goods_id order by proctime desc) as rownum from dm_trd_order_goods)
>         where rownum=1
>         and (order_type in (1,2,3,4,5) or order_status = 70)
>         and terminal = 'shop' and price > 0)
>     group by order_date
>
>
> At runtime, this query will generate two operators which include
> Deduplication and GroupAgg. In the test, the configuration is same,
> parallelism is 20, set kafka consumer from the earliest, and disable
> mini-batch function, The test results as follows:
>
> flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w
> records, average tps at 5200/s, Flink UI picture link back pressure,
> checkpoint
> flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink
> receive 883w records, average tps at 5200/s, Flink UI picture link back
> pressure, checkpoint
> flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m,
> flink only receive 168w records because of deduplication operator serious
> back pressure, average tps at 270/s, moreover, checkpoint always fail
> because of deduplication operator serious back pressure, Flink UI picture
> link back pressure, checkpoint
>
> Deduplication state clean up implement in flink 1.9.0 use timer, but
> 1.11-snapshot version use StateTtlConfig, this is the main difference.
> Comparing the three tests comprehensively, we can see that if disable state
> ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state
> ttl. However, if enable state ttl in 1.11-snapshot, performance down is
> nearly 20 times, so I think incremental cleanup strategy cause this
> problem, what do you think about it? @azagrebin, @jark.
>
> Thanks.
>
> lsyldliu
>
> Zhejiang University, College of Control Science and engineer, CSC

Reply via email to