It may not be completely relevant to this conversation in this year, but I
find myself sharing this article
<https://segment.com/blog/exactly-once-delivery/> once or twice a year when
opining about how hard deduplication at scale can be. 😅

-0xe1a

On Thu, Apr 11, 2024 at 10:22 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Lei,
>
> There is an additional overhead when adding new keys to an operator, since
> Flink needs to maintain the state, timers etc for the individual keys.
> If you are interested in more details, I suggest to use the FlinkUI and
> compare the flamegraph for the stages. There you can see the difference
> between the 2 versions and identify the exact method call causing the
> difference.
> This flamegraph could help identifying other bottlenecks too.
>
> I hope this helps,
> Peter
>
> On Thu, Apr 11, 2024, 08:42 Lei Wang <leiwang...@gmail.com> wrote:
>
>> Hi Peter,
>>
>> I tried,this improved performance significantly,but i don't know exactly
>> why.
>> According to what i know, the number of keys in RocksDB doesn't decrease.
>>
>> Any specific technical material about this?
>>
>> Thanks,
>> Lei
>>
>>
>> On Fri, Mar 29, 2024 at 9:49 PM Lei Wang <leiwang...@gmail.com> wrote:
>>
>>> Perhaps I can  keyBy(Hash(originalKey) % 100000)
>>> Then in the KeyProcessOperator using MapState instead of ValueState
>>>   MapState<OriginalKey, Boolean>  mapState
>>>
>>> There's about  100000 OriginalKey for each mapState
>>>
>>> Hope this will help
>>>
>>> On Fri, Mar 29, 2024 at 9:24 PM Péter Váry <peter.vary.apa...@gmail.com>
>>> wrote:
>>>
>>>> Hi Lei,
>>>>
>>>> Have you tried to make the key smaller, and store a list of found keys
>>>> as a value?
>>>>
>>>> Let's make the operator key a hash of your original key, and store a
>>>> list of the full keys in the state. You can play with your hash length to
>>>> achieve the optimal number of keys.
>>>>
>>>> I hope this helps,
>>>> Peter
>>>>
>>>> On Fri, Mar 29, 2024, 09:08 Lei Wang <leiwang...@gmail.com> wrote:
>>>>
>>>>>
>>>>> Use RocksDBBackend to store whether the element appeared within the
>>>>> last one day,  here is the code:
>>>>>
>>>>> *public class DedupFunction extends KeyedProcessFunction<Long,
>>>>> IN,OUT>  {*
>>>>>
>>>>> *    private ValueState<Boolean> isExist;*
>>>>>
>>>>> *    public void open(Configuration parameters) throws Exception {*
>>>>> *        ValueStateDescriptor<boolean> desc = new ........*
>>>>> *        StateTtlConfig ttlConfig =
>>>>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType......*
>>>>> *        desc.enableTimeToLive(ttlConfig);*
>>>>> *        isExist = getRuntimeContext().getState(desc);*
>>>>> *    }*
>>>>>
>>>>> *    public void processElement(IN in, .... ) {*
>>>>> *        if(null == isExist.value()) {*
>>>>> *            out.collect(in)*
>>>>> *            isExist.update(true)*
>>>>> *        } *
>>>>> *    }*
>>>>> *}*
>>>>>
>>>>> Because the number of distinct key is too large(about 10 billion one
>>>>> day ), there's performance bottleneck for this operator.
>>>>> How can I optimize the performance?
>>>>>
>>>>> Thanks,
>>>>> Lei
>>>>>
>>>>>
>>>>

Reply via email to