Hi Kien, Thanks for your reply!
The approaches your suggested are very useful. I'll redesign the state structure and try these approaches. Thanks a lot! Best, Paul Lam > 在 2018年9月14日,17:01,Kien Truong <duckientru...@gmail.com> 写道: > > Hi Paul, > > We have actually done something like this. Clearing a state with rocksdb > state backend can actually be a very expensive operation, and block the > operators for minutes with large states. > > To mitigate that, there are 2 approaches that we are using > > 1. Keeping the state small by increasing the cardinality of the key > 2. Do not clear the entire state at once, but continuously remove small chunk > of the state using timers. > > Regards, > Kien > >> On Sep 14, 2018 at 15:01, <David Anderson <mailto:da...@data-artisans.com>> >> wrote: >> >> Paul, >> >> Theoretically, processing-time timers will get the job done, but yes, you'd >> need a timer per key -- and folks who've tried this with millions of keys, >> all firing at the same time, have reported that this behaves badly. For some >> use cases it's workable to spread out the timers over an interval, like an >> hour or two, to avoid this timer firing storm, but that doesn't sound like >> it would work well for you. >> >> You might instead try using broadcast state to deal with this. You would >> establish a broadcast stream connected to your keyed stream that acts as a >> control stream for the keyed state. Then in the processBroadcastElement >> method of a KeyedBroadcastProcessFunction you would use applyToKeyedState to >> iterate over all the keyed state and clear everything. Unfortunately it's >> not possible to use timers on broadcast state, so you'll have to find some >> other way to trigger the event on the broadcast stream -- maybe a custom >> source that uses a ProcessingTimeCallback to create events on the broadcast >> stream. >> >> David >> >> On Fri, Sep 14, 2018 at 7:18 AM Paul Lam <paullin3...@gmail.com >> <mailto:paullin3...@gmail.com>> wrote: >> > >> > Hi vino, >> > >> > Thanks for the advice, but I think state TTL does not completely fit in my >> > case. >> > >> > AFAIK, State TTL is per entry level and uses an inactive time threshold to >> > expire entries, but I need a TTL for the whole MapState, which does not >> > depend on when the entries are created or updated. Suppose I’m calculating >> > stats of daily active users and use a userId field as key, I want the >> > state totally truncated at the very beginning of each day. >> > >> > Thanks a lot! >> > >> > Best, >> > Paul Lam >> > >> > >> > 在 2018年9月14日,10:39,vino yang <yanghua1...@gmail.com >> > <mailto:yanghua1...@gmail.com>> 写道: >> > >> > Hi Paul, >> > >> > Maybe you can try to understand the State TTL?[1] >> > >> > Thanks, vino. >> > >> > [1]: >> > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl >> > >> > <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html#state-time-to-live-ttl> >> > >> > Paul Lam <paullin3...@gmail.com <mailto:paullin3...@gmail.com>> >> > 于2018年9月12日周三 下午6:06写道: >> >> >> >> Hi, >> >> >> >> I’m using MapState to deduplicate some ids and the MapState needs to be >> >> truncated periodically. I tried to use ProcessingTimeCallback to call >> >> state.clear(), but in this way I can only clear the state for one key, >> >> and actually I need a key group level cleanup. So I’m wondering is there >> >> any best practice for my case? Thanks a lot! >> >> >> >> Best, >> >> Paul Lam >> > >> > >> >> >> -- >> David Anderson | Training Coordinator | data Artisans >> -- >> Join Flink Forward - The Apache Flink Conference >> Stream Processing | Event Driven | Real Time