Hi Lei, In addition to the valuable suggested options above, maybe you can try to optimize your partitioning function (since you know your data). Maybe sample [subset of] your data if possible and/or check the key distribution, before re-defining your partitioning function.
Regards, Jeyhun On Mon, Apr 1, 2024 at 4:00 AM Xuyang <xyzhong...@163.com> wrote: > Hi, Wang. > > What about just increasing the parallemism to reduce the number of keys > processed per parallelism? Is the distribution > of keys uneven? If so, you can use the datastream api to manually > implement some optimization methods of flink sql.[1] > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#split-distinct-aggregation > > > -- > Best! > Xuyang > > > 在 2024-03-29 21:49:42,"Lei Wang" <leiwang...@gmail.com> 写道: > > Perhaps I can keyBy(Hash(originalKey) % 100000) > Then in the KeyProcessOperator using MapState instead of ValueState > MapState<OriginalKey, Boolean> mapState > > There's about 100000 OriginalKey for each mapState > > Hope this will help > > On Fri, Mar 29, 2024 at 9:24 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Hi Lei, >> >> Have you tried to make the key smaller, and store a list of found keys as >> a value? >> >> Let's make the operator key a hash of your original key, and store a list >> of the full keys in the state. You can play with your hash length to >> achieve the optimal number of keys. >> >> I hope this helps, >> Peter >> >> On Fri, Mar 29, 2024, 09:08 Lei Wang <leiwang...@gmail.com> wrote: >> >>> >>> Use RocksDBBackend to store whether the element appeared within the last >>> one day, here is the code: >>> >>> *public class DedupFunction extends KeyedProcessFunction<Long, IN,OUT> >>> {* >>> >>> * private ValueState<Boolean> isExist;* >>> >>> * public void open(Configuration parameters) throws Exception {* >>> * ValueStateDescriptor<boolean> desc = new ........* >>> * StateTtlConfig ttlConfig = >>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType......* >>> * desc.enableTimeToLive(ttlConfig);* >>> * isExist = getRuntimeContext().getState(desc);* >>> * }* >>> >>> * public void processElement(IN in, .... ) {* >>> * if(null == isExist.value()) {* >>> * out.collect(in)* >>> * isExist.update(true)* >>> * } * >>> * }* >>> *}* >>> >>> Because the number of distinct key is too large(about 10 billion one day >>> ), there's performance bottleneck for this operator. >>> How can I optimize the performance? >>> >>> Thanks, >>> Lei >>> >>> >>