It may not be completely relevant to this conversation in this year, but I find myself sharing this article <https://segment.com/blog/exactly-once-delivery/> once or twice a year when opining about how hard deduplication at scale can be. 😅
-0xe1a On Thu, Apr 11, 2024 at 10:22 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Lei, > > There is an additional overhead when adding new keys to an operator, since > Flink needs to maintain the state, timers etc for the individual keys. > If you are interested in more details, I suggest to use the FlinkUI and > compare the flamegraph for the stages. There you can see the difference > between the 2 versions and identify the exact method call causing the > difference. > This flamegraph could help identifying other bottlenecks too. > > I hope this helps, > Peter > > On Thu, Apr 11, 2024, 08:42 Lei Wang <leiwang...@gmail.com> wrote: > >> Hi Peter, >> >> I tried,this improved performance significantly,but i don't know exactly >> why. >> According to what i know, the number of keys in RocksDB doesn't decrease. >> >> Any specific technical material about this? >> >> Thanks, >> Lei >> >> >> On Fri, Mar 29, 2024 at 9:49 PM Lei Wang <leiwang...@gmail.com> wrote: >> >>> Perhaps I can keyBy(Hash(originalKey) % 100000) >>> Then in the KeyProcessOperator using MapState instead of ValueState >>> MapState<OriginalKey, Boolean> mapState >>> >>> There's about 100000 OriginalKey for each mapState >>> >>> Hope this will help >>> >>> On Fri, Mar 29, 2024 at 9:24 PM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Hi Lei, >>>> >>>> Have you tried to make the key smaller, and store a list of found keys >>>> as a value? >>>> >>>> Let's make the operator key a hash of your original key, and store a >>>> list of the full keys in the state. You can play with your hash length to >>>> achieve the optimal number of keys. >>>> >>>> I hope this helps, >>>> Peter >>>> >>>> On Fri, Mar 29, 2024, 09:08 Lei Wang <leiwang...@gmail.com> wrote: >>>> >>>>> >>>>> Use RocksDBBackend to store whether the element appeared within the >>>>> last one day, here is the code: >>>>> >>>>> *public class DedupFunction extends KeyedProcessFunction<Long, >>>>> IN,OUT> {* >>>>> >>>>> * private ValueState<Boolean> isExist;* >>>>> >>>>> * public void open(Configuration parameters) throws Exception {* >>>>> * ValueStateDescriptor<boolean> desc = new ........* >>>>> * StateTtlConfig ttlConfig = >>>>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType......* >>>>> * desc.enableTimeToLive(ttlConfig);* >>>>> * isExist = getRuntimeContext().getState(desc);* >>>>> * }* >>>>> >>>>> * public void processElement(IN in, .... ) {* >>>>> * if(null == isExist.value()) {* >>>>> * out.collect(in)* >>>>> * isExist.update(true)* >>>>> * } * >>>>> * }* >>>>> *}* >>>>> >>>>> Because the number of distinct key is too large(about 10 billion one >>>>> day ), there's performance bottleneck for this operator. >>>>> How can I optimize the performance? >>>>> >>>>> Thanks, >>>>> Lei >>>>> >>>>> >>>>