And another additional followup! ( Sorry if there's a lot of followups! We've ran a flink consumer but these are very basic consumers without state! ).
Suppose I want to use a MapState[String, <Thrift_object>]... in order to make that happen, following this link https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html should suffice right? And let's suppose after this state access, I want to emit a variant of this <thrift_object> to the next operator...then I presume I need to let flink know how to serialize this message via kryoSerializer: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html ? ( this is a bit off topic by this point now! ) On Thu, Mar 19, 2020 at 11:25 PM Matthew Rafael Magsombol < raffy4...@gmail.com> wrote: > Also as a follow up question with respect to state cleanup, > I see that there's an incremental cleanup option: > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup > It has notes indicating that if no access happens to that state/no records > processed, then that expired state persists... > > So just for clarification, let's suppose I have a key named "A". "A" has a > Ttl of 1 hour and "A" was last updated at the 30th minute. > By the time I hit any time after the 1 hour'th mark, if I don't receive > any message that has key "A" and try to query it from state, does that mean > that > "A" will just hang around? Or will it eventually get cleaned up? > > I see that for rocksdb, it runs an async compaction. In this scenario, key > "A" will eventually be cleaned up even if we don't access and update it > after that 1 hour TTL right? > > Yeah, I just want to make sure that for state where the last time I've > updated them was probably earlier on than the TTL and never updated, I want > to make sure that those keys are > eventually cleaned up without having to "read" from them. It sounds like > rocksdb cleans these up via compaction but what about for states where we > use FSBackendState where we use the heap for in-flight data? > > On Thu, Mar 19, 2020 at 7:07 PM Matthew Rafael Magsombol < > raffy4...@gmail.com> wrote: > >> I see... >> The way we run our setup is that we run these in a kubernetes cluster >> where we have one cluster running one job. >> The total parallelism of the whole cluster is equal to the number of >> taskmanagers where each task manager has 1 core cpu accounting for 1 slot. >> If we add a state ttl, do you have any recommendation as to how much I >> should bump the cpu per task manager? 2 cores per task manager with 1 slot >> per task manager ( and the other cpu core will be used for TTL state >> cleanup? ). >> Or is that overkill? >> >> On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <azagre...@apache.org> >> wrote: >> >>> Hi Matt, >>> >>> Generally speaking, using state with TTL in Flink should not differ a >>> lot from just using Flink with state [1]. >>> You have to provision your system so that it can keep the state of size >>> which is worth of 7 days. >>> >>> The existing Flink state backends provide background cleanup to >>> automatically remove the expired state eventually, >>> so that your application does not need to do any explicit access of the >>> expired state to clean it. >>> The background cleanup is active by default since Flink 1.10 [2]. >>> >>> Enabling TTL for state, of course, comes for price because you need to >>> store timestamp and spend CPU cycles for the background cleanup. >>> This affects storage size and potentially processing latency per record. >>> You can read about details and caveats in the docs: for heap state [3] >>> and RocksDB [4]. >>> >>> Best, >>> Andrey >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-of-expired-state >>> [3] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#incremental-cleanup >>> [4] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-during-rocksdb-compaction >>> >>> On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <raffy4...@gmail.com> >>> wrote: >>> >>>> Suppose I'm using state stored in-memory that has a TTL of 7 days max. >>>> Should I run into any issues with state this long other than potential OOM? >>>> >>>> Let's suppose I extend this such that we add rocksdb...any concerns >>>> with this with respect to maintenance? >>>> >>>> Most of the examples that I've been seeing seem to pair state with >>>> timewindows but I'll only read from this state every 15 seconds ( or some >>>> small timewindow ). After each timewindow, I *won't* be cleaning up the >>>> data within the state b/c I'll need to re-lookup from this state on future >>>> time windows. I'll effectively rely on TTL based on key expiration time and >>>> I was wondering what potential issues I should watch out for this. >>>> >>>