[ https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805110#comment-17805110 ]
Jinzhong Li edited comment on FLINK-33881 at 1/10/24 12:29 PM: --------------------------------------------------------------- [~Zakelly] I have done the performance testing with TtlListStateBenchmark. The results show that TtlListState has about 27%+ performance improvement with this optimization. |Benchmark|Param: backendType|Param: expiredOption|Param: stateVisibility|Param: updateType| Score {color:#ff0000}without{color} optimization|Score {color:#ff0000}with{color} optimization| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|686.100001|902.399321| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|713.428757|906.389711| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|720.125366|902.577086| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|736.226185|881.717185| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|733.647731|894.295796| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|721.162458|901.177007| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|731.355353|890.06144| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|726.943995|909.319816| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|212.313443|261.96109| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|211.863397|258.266859| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|215.67765|259.52777| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|208.505043|263.768959| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|201.646868|257.422463| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|214.599971|267.296024| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|207.070109|263.741483| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|213.35357|265.783929| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|536.94471|688.041948| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|538.545608|678.766982| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|530.422217|688.008454| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|540.517729|690.383004| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|538.089004|695.018344| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|532.924439|690.375419| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|513.289823|715.811655| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|534.967481|724.869126| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|630.801404|822.861466| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|495.627707|678.028432| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|645.222027|780.880865| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|513.539414|674.461302| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|645.480934|763.709594| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|512.678829|673.243958| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|646.656774|822.529003| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|518.734052|664.093519| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|592.612052|778.660071| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|507.275622|655.986257| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|597.026859|814.902181| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|500.038208|695.082031| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|593.968845|783.100913| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|503.457165|654.044131| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|592.170287|722.38563| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|498.257928|652.923734| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|557.020907|683.122647| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|556.828811|696.510575| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|554.269049|670.339559| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|547.803797|708.379692| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|562.205645|700.56704| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|553.76725|683.665343| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|536.061935|679.364527| | |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|556.908654|719.853693| | | Total| | | | |25138.13506|32035.67703| | was (Author: lijinzhong): [~Zakelly] I have done the performance testing with TtlListStateBenchmark. The results show that TtlListState has about 27%+ performance improvement with this optimization. |Benchmark|Param: backendType|Param: expiredOption|Param: stateVisibility|Param: updateType| Score {color:#FF0000}without{color} optimization|Score {color:#FF0000}with{color} optimization| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|686.100001|902.399321| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|713.428757|906.389711| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|720.125366|902.577086| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|736.226185|881.717185| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|733.647731|894.295796| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|721.162458|901.177007| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|731.355353|890.06144| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAdd|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|726.943995|909.319816| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|212.313443|261.96109| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|211.863397|258.266859| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|215.67765|259.52777| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|208.505043|263.768959| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|201.646868|257.422463| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|214.599971|267.296024| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|207.070109|263.741483| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAddAll|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|213.35357|265.783929| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|536.94471|688.041948| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|538.545608|678.766982| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|530.422217|688.008454| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|540.517729|690.383004| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|538.089004|695.018344| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|532.924439|690.375419| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|513.289823|715.811655| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listAppend|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|534.967481|724.869126| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|630.801404|822.861466| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|495.627707|678.028432| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|645.222027|780.880865| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|513.539414|674.461302| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|645.480934|763.709594| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|512.678829|673.243958| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|646.656774|822.529003| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGet|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|518.734052|664.093519| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|592.612052|778.660071| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|507.275622|655.986257| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|597.026859|814.902181| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|500.038208|695.082031| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|593.968845|783.100913| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|503.457165|654.044131| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|592.170287|722.38563| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listGetAndIterate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|498.257928|652.923734| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnCreateAndWrite|557.020907|683.122647| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|NeverReturnExpired|OnReadAndWrite|556.828811|696.510575| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|554.269049|670.339559| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|Expire3PercentPerIteration|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|547.803797|708.379692| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|NeverReturnExpired|OnCreateAndWrite|562.205645|700.56704| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|NeverReturnExpired|OnReadAndWrite|553.76725|683.665343| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnCreateAndWrite|536.061935|679.364527| |org.apache.flink.state.benchmark.ttl.TtlListStateBenchmark.listUpdate|HEAP|NeverExpired|ReturnExpiredIfNotCleanedUp|OnReadAndWrite|556.908654|719.853693| | Total| | | | | |25138.13506|32035.67703| > [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull > ---------------------------------------------------------------------------- > > Key: FLINK-33881 > URL: https://issues.apache.org/jira/browse/FLINK-33881 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends > Reporter: Jinzhong Li > Assignee: Jinzhong Li > Priority: Minor > Labels: pull-request-available > Attachments: image-2023-12-19-21-25-21-446.png, > image-2023-12-19-21-26-43-518.png > > > In some scenarios, 'TtlListState#getUnexpiredOrNull -> > elementSerializer.copy(ttlValue)' consumes a lot of cpu resources. > !image-2023-12-19-21-25-21-446.png|width=529,height=119! > I found that for TtlListState#getUnexpiredOrNull, if none of the elements > have expired, it still needs to copy all the elements and update the whole > list/map in TtlIncrementalCleanup#runCleanup(); > !image-2023-12-19-21-26-43-518.png|width=505,height=266! > I think we could optimize TtlListState#getUnexpiredOrNull by: > 1)find the first expired element index in the list; > 2)If not found, return to the original list; > 3)If found, then constrct the unexpire list (puts the previous elements into > the list), and go through the subsequent elements, adding expired elements > into the list. > {code:java} > public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> > ttlValues) { > //....... > int firstExpireIndex = -1; > for (int i = 0; i < ttlValues.size(); i++) { > if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > firstExpireIndex = i; > break; > } > } > if (firstExpireIndex == -1) { > return ttlValues; //return the original ttlValues > } > List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size()); > for (int i = 0; i < ttlValues.size(); i++) { > if (i < firstExpireIndex) { > // unexpired.add(ttlValues.get(i)); > unexpired.add(elementSerializer.copy(ttlValues.get(i))); > } > if (i > firstExpireIndex) { > if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > // unexpired.add(ttlValues.get(i)); > unexpired.add(elementSerializer.copy(ttlValues.get(i))); > } > } > } > // ..... > } {code} > *In this way, the extra iteration overhead is actually very very small, but > the benefit when there are no expired elements is significant.* -- This message was sent by Atlassian Jira (v8.20.10#820010)