[ https://issues.apache.org/jira/browse/FLINK-33881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jinzhong Li updated FLINK-33881: -------------------------------- Description: In some scenarios, 'TtlListState#getUnexpiredOrNull -> elementSerializer.copy(ttlValue)' consumes a lot of cpu resources. !image-2023-12-19-21-25-21-446.png|width=529,height=119! I found that for TtlListState#getUnexpiredOrNull, if none of the elements have expired, it still needs to copy all the elements and update the whole list/map in TtlIncrementalCleanup#runCleanup(); !image-2023-12-19-21-26-43-518.png|width=505,height=266! I think we could optimize TtlListState#getUnexpiredOrNull by: 1)find the first expired element index in the list; 2)If not found, return to the original list; 3)If found, then constrct the unexpire list (puts the previous elements into the list), and go through the subsequent elements, adding expired elements into the list. {code:java} public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> ttlValues) { //....... int firstExpireIndex = -1; for (int i = 0; i < ttlValues.size(); i++) { if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { firstExpireIndex = i; break; } } if (firstExpireIndex == -1) { return ttlValues; //return the original ttlValues } List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size()); for (int i = 0; i < ttlValues.size(); i++) { if (i < firstExpireIndex) { unexpired.add(ttlValues.get(i)); } if (i > firstExpireIndex) { if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { unexpired.add(ttlValues.get(i)); } } } // ..... } {code} In this way, the extra iteration overhead is actually very very small, but the benefit when there are no expired elements is significant. was: In some scenarios, 'TtlListState#getUnexpiredOrNull -> elementSerializer.copy(ttlValue)' consumes a lot of cpu resources. !image-2023-12-19-21-25-21-446.png|width=444,height=100! I found that for TtlListState#getUnexpiredOrNull, if none of the elements have expired, it still needs to copy all the elements and update the whole list/map in TtlIncrementalCleanup#runCleanup(); !image-2023-12-19-21-26-43-518.png|width=317,height=167! I think we could optimize TtlListState#getUnexpiredOrNull by: 1)find the first expired element index in the list; 2)If not found, return to the original list; 3)If found, then constrct the unexpire list (puts the previous elements into the list), and go through the subsequent elements, adding expired elements into the list. {code:java} public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> ttlValues) { //....... int firstExpireIndex = -1; for (int i = 0; i < ttlValues.size(); i++) { if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { firstExpireIndex = i; break; } } if (firstExpireIndex == -1) { return ttlValues; //return the original ttlValues } List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size()); for (int i = 0; i < ttlValues.size(); i++) { if (i < firstExpireIndex) { unexpired.add(ttlValues.get(i)); } if (i > firstExpireIndex) { if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { unexpired.add(ttlValues.get(i)); } } } // ..... } {code} In this way, the extra iteration overhead is actually very very small, but the benefit when there are no expired elements is significant. > [TtlListState]Avoid copy and update value in TtlListState#getUnexpiredOrNull > ---------------------------------------------------------------------------- > > Key: FLINK-33881 > URL: https://issues.apache.org/jira/browse/FLINK-33881 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends > Reporter: Jinzhong Li > Priority: Minor > Attachments: image-2023-12-19-21-25-21-446.png, > image-2023-12-19-21-26-43-518.png > > > In some scenarios, 'TtlListState#getUnexpiredOrNull -> > elementSerializer.copy(ttlValue)' consumes a lot of cpu resources. > !image-2023-12-19-21-25-21-446.png|width=529,height=119! > I found that for TtlListState#getUnexpiredOrNull, if none of the elements > have expired, it still needs to copy all the elements and update the whole > list/map in TtlIncrementalCleanup#runCleanup(); > !image-2023-12-19-21-26-43-518.png|width=505,height=266! > I think we could optimize TtlListState#getUnexpiredOrNull by: > 1)find the first expired element index in the list; > 2)If not found, return to the original list; > 3)If found, then constrct the unexpire list (puts the previous elements into > the list), and go through the subsequent elements, adding expired elements > into the list. > {code:java} > public List<TtlValue<T>> getUnexpiredOrNull(@Nonnull List<TtlValue<T>> > ttlValues) { > //....... > int firstExpireIndex = -1; > for (int i = 0; i < ttlValues.size(); i++) { > if (TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > firstExpireIndex = i; > break; > } > } > if (firstExpireIndex == -1) { > return ttlValues; //return the original ttlValues > } > List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size()); > for (int i = 0; i < ttlValues.size(); i++) { > if (i < firstExpireIndex) { > unexpired.add(ttlValues.get(i)); > } > if (i > firstExpireIndex) { > if (!TtlUtils.expired(ttlValues.get(i), ttl, currentTimestamp)) { > unexpired.add(ttlValues.get(i)); > } > } > } > // ..... > } {code} > In this way, the extra iteration overhead is actually very very small, but > the benefit when there are no expired elements is significant. -- This message was sent by Atlassian Jira (v8.20.10#820010)