Hi lsyldliu, Thanks for investigating this.
First of all, if you are using mini-batch deduplication, it doesn't support state ttl in 1.9. That's why the tps looks the same with 1.11 disable state ttl. We just introduce state ttl for mini-batch deduplication recently. Regarding to the performance regression, it looks very surprise to me. The performance is reduced by 19x when StateTtlConfig is enabled in 1.11. I don't have much experience of the underlying of StateTtlConfig. So I loop in @Yu Li <car...@gmail.com> @YunTang in CC who may have more insights on this. For more information, we use the following StateTtlConfig [1] in blink planner: StateTtlConfig .newBuilder(Time.milliseconds(retentionTime)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); Best, Jark [1]: https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27 On Wed, 29 Apr 2020 at 11:53, 刘大龙 <ld...@zju.edu.cn> wrote: > Hi, all! > > At flink master branch, we have supported state ttl for sql mini-batch > deduplication using incremental cleanup strategy on heap backend, refer to > FLINK-16581. Because I want to test the performance of this feature, so I > compile master branch code and deploy the jar to production > environment,then run three types of tests, respectively: > > > > > flink 1.9.0 release version enable state ttl > flink 1.11-snapshot version disable state ttl > flink 1.11-snapshot version enable state ttl > > > > > The test query sql as follows: > > select order_date, > sum(price * amount - goods_all_fav_amt - virtual_money_amt + > goods_carriage_amt) as saleP, > sum(amount) as saleN, > count(distinct parent_sn) as orderN, > count(distinct user_id) as cusN > from( > select order_date, user_id, > order_type, order_status, terminal, last_update_time, > goods_all_fav_amt, > goods_carriage_amt, virtual_money_amt, price, amount, > order_quality, quality_goods_cnt, acture_goods_amt > from (select *, row_number() over(partition by order_id, > order_goods_id order by proctime desc) as rownum from dm_trd_order_goods) > where rownum=1 > and (order_type in (1,2,3,4,5) or order_status = 70) > and terminal = 'shop' and price > 0) > group by order_date > > > At runtime, this query will generate two operators which include > Deduplication and GroupAgg. In the test, the configuration is same, > parallelism is 20, set kafka consumer from the earliest, and disable > mini-batch function, The test results as follows: > > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w > records, average tps at 5200/s, Flink UI picture link back pressure, > checkpoint > flink 1.11-snapshot version disable state ttl:this test lasted 28m, flink > receive 883w records, average tps at 5200/s, Flink UI picture link back > pressure, checkpoint > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m, > flink only receive 168w records because of deduplication operator serious > back pressure, average tps at 270/s, moreover, checkpoint always fail > because of deduplication operator serious back pressure, Flink UI picture > link back pressure, checkpoint > > Deduplication state clean up implement in flink 1.9.0 use timer, but > 1.11-snapshot version use StateTtlConfig, this is the main difference. > Comparing the three tests comprehensively, we can see that if disable state > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable state > ttl. However, if enable state ttl in 1.11-snapshot, performance down is > nearly 20 times, so I think incremental cleanup strategy cause this > problem, what do you think about it? @azagrebin, @jark. > > Thanks. > > lsyldliu > > Zhejiang University, College of Control Science and engineer, CSC