*Resources:*

If you use heap state backend, the cleanup happens while processing records
in the same thread so there is direct connection with the number of cores.
If you use RocksDB state backend, extra cpus can be used by async
compaction and should speed up the background cleanup.

*Incremental cleanup semantics:*

> if no access happens to that state
This means no access to any key. Practically no data hits the pipeline and
no processing happening. As mentioned, heap cleanup happens while
processing records or accessing any state key, not necessary the key which
needs cleanup.

>  I want to make sure that those keys are eventually cleaned up without
having to "read" from them.
I think this should be ok for both backends. Just for heap, if the pipeline
stalls then the cleanup is on hold as well.

*Serialization*

I am not a deep expert here and not sure I fully follow the problem.
I would suggest to start another thread with more details to make it
visible for other people as well.

In general, what you see in docs about serializers is important for state
as well.
So you have to be aware which object you keep in state and exchange between
operators and make sure they are serializable for Flink.
Pay attention to migration if you want to evolve the types of your objects,
it is not a trivial topic [1].

Best,
Andrey

On Fri, Mar 20, 2020 at 9:39 AM Matthew Rafael Magsombol <
raffy4...@gmail.com> wrote:

> And another additional followup!
> ( Sorry if there's a lot of followups! We've ran a flink consumer but
> these are very basic consumers without state! ).
>
> Suppose I want to use a MapState[String, <Thrift_object>]...
> in order to make that happen, following this link
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html
>  should
> suffice right?
> And let's suppose after this state access, I want to emit a variant of
> this <thrift_object> to the next operator...then I presume I need to let
> flink know how to serialize this message via kryoSerializer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html
>  ?
>
> ( this is a bit off topic by this point now! )
>
> On Thu, Mar 19, 2020 at 11:25 PM Matthew Rafael Magsombol <
> raffy4...@gmail.com> wrote:
>
>> Also as a follow up question with respect to state cleanup,
>> I see that there's an incremental cleanup option:
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup
>> It has notes indicating that if no access happens to that state/no
>> records processed, then that expired state persists...
>>
>> So just for clarification, let's suppose I have a key named "A". "A" has
>> a Ttl of 1 hour and "A" was last updated at the 30th minute.
>> By the time I hit any time after the 1 hour'th mark, if I don't receive
>> any message that has key "A" and try to query it from state, does that mean
>> that
>> "A" will just hang around? Or will it eventually get cleaned up?
>>
>> I see that for rocksdb, it runs an async compaction. In this scenario,
>> key "A" will eventually be cleaned up even if we don't access and update it
>> after that 1 hour TTL right?
>>
>> Yeah, I just want to make sure that for state where the last time I've
>> updated them was probably earlier on than the TTL and never updated, I want
>> to make sure that those keys are
>> eventually cleaned up without having to "read" from them. It sounds like
>> rocksdb cleans these up via compaction but what about for states where we
>> use FSBackendState where we use the heap for in-flight data?
>>
>> On Thu, Mar 19, 2020 at 7:07 PM Matthew Rafael Magsombol <
>> raffy4...@gmail.com> wrote:
>>
>>> I see...
>>> The way we run our setup is that we run these in a kubernetes cluster
>>> where we have one cluster running one job.
>>> The total parallelism of the whole cluster is equal to the number of
>>> taskmanagers where each task manager has 1 core cpu accounting for 1 slot.
>>> If we add a state ttl, do you have any recommendation as to how much I
>>> should bump the cpu per task manager? 2 cores per task manager with 1 slot
>>> per task manager ( and the other cpu core will be used for TTL state
>>> cleanup? ).
>>> Or is that overkill?
>>>
>>> On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <azagre...@apache.org>
>>> wrote:
>>>
>>>> Hi Matt,
>>>>
>>>> Generally speaking, using state with TTL in Flink should not differ a
>>>> lot from just using Flink with state [1].
>>>> You have to provision your system so that it can keep the state of size
>>>> which is worth of 7 days.
>>>>
>>>> The existing Flink state backends provide background cleanup to
>>>> automatically remove the expired state eventually,
>>>> so that your application does not need to do any explicit access of the
>>>> expired state to clean it.
>>>> The background cleanup is active by default since Flink 1.10 [2].
>>>>
>>>> Enabling TTL for state, of course, comes for price because you need to
>>>> store timestamp and spend CPU cycles for the background cleanup.
>>>> This affects storage size and potentially processing latency per record.
>>>> You can read about details and caveats in the docs: for heap state [3]
>>>> and RocksDB [4].
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-of-expired-state
>>>> [3]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#incremental-cleanup
>>>> [4]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>>>
>>>> On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <raffy4...@gmail.com>
>>>> wrote:
>>>>
>>>>> Suppose I'm using state stored in-memory that has a TTL of 7 days max.
>>>>> Should I run into any issues with state this long other than potential 
>>>>> OOM?
>>>>>
>>>>> Let's suppose I extend this such that we add rocksdb...any concerns
>>>>> with this with respect to maintenance?
>>>>>
>>>>> Most of the examples that I've been seeing seem to pair state with
>>>>> timewindows but I'll only read from this state every 15 seconds ( or some
>>>>> small timewindow ). After each timewindow, I *won't* be cleaning up the
>>>>> data within the state b/c I'll need to re-lookup from this state on future
>>>>> time windows. I'll effectively rely on TTL based on key expiration time 
>>>>> and
>>>>> I was wondering what potential issues I should watch out for this.
>>>>>
>>>>

Reply via email to