Gotcha, ok Thanks! I think this is everything I need to know for now! I can get around using thrift as a state data type by using generic flink data type and upon read, I can convert to thrift data type to pass to my sink.
On Fri, Mar 20, 2020 at 1:15 AM Andrey Zagrebin <azagre...@apache.org> wrote: > *Resources:* > > If you use heap state backend, the cleanup happens while processing > records in the same thread so there is direct connection with the number of > cores. > If you use RocksDB state backend, extra cpus can be used by async > compaction and should speed up the background cleanup. > > *Incremental cleanup semantics:* > > > if no access happens to that state > This means no access to any key. Practically no data hits the pipeline and > no processing happening. As mentioned, heap cleanup happens while > processing records or accessing any state key, not necessary the key which > needs cleanup. > > > I want to make sure that those keys are eventually cleaned up without > having to "read" from them. > I think this should be ok for both backends. Just for heap, if the > pipeline stalls then the cleanup is on hold as well. > > *Serialization* > > I am not a deep expert here and not sure I fully follow the problem. > I would suggest to start another thread with more details to make it > visible for other people as well. > > In general, what you see in docs about serializers is important for state > as well. > So you have to be aware which object you keep in state and exchange > between operators and make sure they are serializable for Flink. > Pay attention to migration if you want to evolve the types of your > objects, it is not a trivial topic [1]. > > Best, > Andrey > > On Fri, Mar 20, 2020 at 9:39 AM Matthew Rafael Magsombol < > raffy4...@gmail.com> wrote: > >> And another additional followup! >> ( Sorry if there's a lot of followups! We've ran a flink consumer but >> these are very basic consumers without state! ). >> >> Suppose I want to use a MapState[String, <Thrift_object>]... >> in order to make that happen, following this link >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html >> should >> suffice right? >> And let's suppose after this state access, I want to emit a variant of >> this <thrift_object> to the next operator...then I presume I need to let >> flink know how to serialize this message via kryoSerializer: >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html >> ? >> >> ( this is a bit off topic by this point now! ) >> >> On Thu, Mar 19, 2020 at 11:25 PM Matthew Rafael Magsombol < >> raffy4...@gmail.com> wrote: >> >>> Also as a follow up question with respect to state cleanup, >>> I see that there's an incremental cleanup option: >>> >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup >>> It has notes indicating that if no access happens to that state/no >>> records processed, then that expired state persists... >>> >>> So just for clarification, let's suppose I have a key named "A". "A" has >>> a Ttl of 1 hour and "A" was last updated at the 30th minute. >>> By the time I hit any time after the 1 hour'th mark, if I don't receive >>> any message that has key "A" and try to query it from state, does that mean >>> that >>> "A" will just hang around? Or will it eventually get cleaned up? >>> >>> I see that for rocksdb, it runs an async compaction. In this scenario, >>> key "A" will eventually be cleaned up even if we don't access and update it >>> after that 1 hour TTL right? >>> >>> Yeah, I just want to make sure that for state where the last time I've >>> updated them was probably earlier on than the TTL and never updated, I want >>> to make sure that those keys are >>> eventually cleaned up without having to "read" from them. It sounds like >>> rocksdb cleans these up via compaction but what about for states where we >>> use FSBackendState where we use the heap for in-flight data? >>> >>> On Thu, Mar 19, 2020 at 7:07 PM Matthew Rafael Magsombol < >>> raffy4...@gmail.com> wrote: >>> >>>> I see... >>>> The way we run our setup is that we run these in a kubernetes cluster >>>> where we have one cluster running one job. >>>> The total parallelism of the whole cluster is equal to the number of >>>> taskmanagers where each task manager has 1 core cpu accounting for 1 slot. >>>> If we add a state ttl, do you have any recommendation as to how much I >>>> should bump the cpu per task manager? 2 cores per task manager with 1 slot >>>> per task manager ( and the other cpu core will be used for TTL state >>>> cleanup? ). >>>> Or is that overkill? >>>> >>>> On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <azagre...@apache.org> >>>> wrote: >>>> >>>>> Hi Matt, >>>>> >>>>> Generally speaking, using state with TTL in Flink should not differ a >>>>> lot from just using Flink with state [1]. >>>>> You have to provision your system so that it can keep the state of >>>>> size which is worth of 7 days. >>>>> >>>>> The existing Flink state backends provide background cleanup to >>>>> automatically remove the expired state eventually, >>>>> so that your application does not need to do any explicit access of >>>>> the expired state to clean it. >>>>> The background cleanup is active by default since Flink 1.10 [2]. >>>>> >>>>> Enabling TTL for state, of course, comes for price because you need to >>>>> store timestamp and spend CPU cycles for the background cleanup. >>>>> This affects storage size and potentially processing latency per >>>>> record. >>>>> You can read about details and caveats in the docs: for heap state [3] >>>>> and RocksDB [4]. >>>>> >>>>> Best, >>>>> Andrey >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-of-expired-state >>>>> [3] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#incremental-cleanup >>>>> [4] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-during-rocksdb-compaction >>>>> >>>>> On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <raffy4...@gmail.com> >>>>> wrote: >>>>> >>>>>> Suppose I'm using state stored in-memory that has a TTL of 7 days >>>>>> max. Should I run into any issues with state this long other than >>>>>> potential >>>>>> OOM? >>>>>> >>>>>> Let's suppose I extend this such that we add rocksdb...any concerns >>>>>> with this with respect to maintenance? >>>>>> >>>>>> Most of the examples that I've been seeing seem to pair state with >>>>>> timewindows but I'll only read from this state every 15 seconds ( or some >>>>>> small timewindow ). After each timewindow, I *won't* be cleaning up the >>>>>> data within the state b/c I'll need to re-lookup from this state on >>>>>> future >>>>>> time windows. I'll effectively rely on TTL based on key expiration time >>>>>> and >>>>>> I was wondering what potential issues I should watch out for this. >>>>>> >>>>>