Gotcha, ok
Thanks! I think this is everything I need to know for now!
I can get around using thrift as a state data type by using generic flink
data type and upon read, I can convert to thrift data type to pass to my
sink.


On Fri, Mar 20, 2020 at 1:15 AM Andrey Zagrebin <azagre...@apache.org>
wrote:

> *Resources:*
>
> If you use heap state backend, the cleanup happens while processing
> records in the same thread so there is direct connection with the number of
> cores.
> If you use RocksDB state backend, extra cpus can be used by async
> compaction and should speed up the background cleanup.
>
> *Incremental cleanup semantics:*
>
> > if no access happens to that state
> This means no access to any key. Practically no data hits the pipeline and
> no processing happening. As mentioned, heap cleanup happens while
> processing records or accessing any state key, not necessary the key which
> needs cleanup.
>
> >  I want to make sure that those keys are eventually cleaned up without
> having to "read" from them.
> I think this should be ok for both backends. Just for heap, if the
> pipeline stalls then the cleanup is on hold as well.
>
> *Serialization*
>
> I am not a deep expert here and not sure I fully follow the problem.
> I would suggest to start another thread with more details to make it
> visible for other people as well.
>
> In general, what you see in docs about serializers is important for state
> as well.
> So you have to be aware which object you keep in state and exchange
> between operators and make sure they are serializable for Flink.
> Pay attention to migration if you want to evolve the types of your
> objects, it is not a trivial topic [1].
>
> Best,
> Andrey
>
> On Fri, Mar 20, 2020 at 9:39 AM Matthew Rafael Magsombol <
> raffy4...@gmail.com> wrote:
>
>> And another additional followup!
>> ( Sorry if there's a lot of followups! We've ran a flink consumer but
>> these are very basic consumers without state! ).
>>
>> Suppose I want to use a MapState[String, <Thrift_object>]...
>> in order to make that happen, following this link
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html
>>  should
>> suffice right?
>> And let's suppose after this state access, I want to emit a variant of
>> this <thrift_object> to the next operator...then I presume I need to let
>> flink know how to serialize this message via kryoSerializer:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/custom_serializers.html
>>  ?
>>
>> ( this is a bit off topic by this point now! )
>>
>> On Thu, Mar 19, 2020 at 11:25 PM Matthew Rafael Magsombol <
>> raffy4...@gmail.com> wrote:
>>
>>> Also as a follow up question with respect to state cleanup,
>>> I see that there's an incremental cleanup option:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup
>>> It has notes indicating that if no access happens to that state/no
>>> records processed, then that expired state persists...
>>>
>>> So just for clarification, let's suppose I have a key named "A". "A" has
>>> a Ttl of 1 hour and "A" was last updated at the 30th minute.
>>> By the time I hit any time after the 1 hour'th mark, if I don't receive
>>> any message that has key "A" and try to query it from state, does that mean
>>> that
>>> "A" will just hang around? Or will it eventually get cleaned up?
>>>
>>> I see that for rocksdb, it runs an async compaction. In this scenario,
>>> key "A" will eventually be cleaned up even if we don't access and update it
>>> after that 1 hour TTL right?
>>>
>>> Yeah, I just want to make sure that for state where the last time I've
>>> updated them was probably earlier on than the TTL and never updated, I want
>>> to make sure that those keys are
>>> eventually cleaned up without having to "read" from them. It sounds like
>>> rocksdb cleans these up via compaction but what about for states where we
>>> use FSBackendState where we use the heap for in-flight data?
>>>
>>> On Thu, Mar 19, 2020 at 7:07 PM Matthew Rafael Magsombol <
>>> raffy4...@gmail.com> wrote:
>>>
>>>> I see...
>>>> The way we run our setup is that we run these in a kubernetes cluster
>>>> where we have one cluster running one job.
>>>> The total parallelism of the whole cluster is equal to the number of
>>>> taskmanagers where each task manager has 1 core cpu accounting for 1 slot.
>>>> If we add a state ttl, do you have any recommendation as to how much I
>>>> should bump the cpu per task manager? 2 cores per task manager with 1 slot
>>>> per task manager ( and the other cpu core will be used for TTL state
>>>> cleanup? ).
>>>> Or is that overkill?
>>>>
>>>> On Thu, Mar 19, 2020 at 12:56 PM Andrey Zagrebin <azagre...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Matt,
>>>>>
>>>>> Generally speaking, using state with TTL in Flink should not differ a
>>>>> lot from just using Flink with state [1].
>>>>> You have to provision your system so that it can keep the state of
>>>>> size which is worth of 7 days.
>>>>>
>>>>> The existing Flink state backends provide background cleanup to
>>>>> automatically remove the expired state eventually,
>>>>> so that your application does not need to do any explicit access of
>>>>> the expired state to clean it.
>>>>> The background cleanup is active by default since Flink 1.10 [2].
>>>>>
>>>>> Enabling TTL for state, of course, comes for price because you need to
>>>>> store timestamp and spend CPU cycles for the background cleanup.
>>>>> This affects storage size and potentially processing latency per
>>>>> record.
>>>>> You can read about details and caveats in the docs: for heap state [3]
>>>>> and RocksDB [4].
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-of-expired-state
>>>>> [3]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#incremental-cleanup
>>>>> [4]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#cleanup-during-rocksdb-compaction
>>>>>
>>>>> On Thu, Mar 19, 2020 at 6:48 PM Matt Magsombol <raffy4...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Suppose I'm using state stored in-memory that has a TTL of 7 days
>>>>>> max. Should I run into any issues with state this long other than 
>>>>>> potential
>>>>>> OOM?
>>>>>>
>>>>>> Let's suppose I extend this such that we add rocksdb...any concerns
>>>>>> with this with respect to maintenance?
>>>>>>
>>>>>> Most of the examples that I've been seeing seem to pair state with
>>>>>> timewindows but I'll only read from this state every 15 seconds ( or some
>>>>>> small timewindow ). After each timewindow, I *won't* be cleaning up the
>>>>>> data within the state b/c I'll need to re-lookup from this state on 
>>>>>> future
>>>>>> time windows. I'll effectively rely on TTL based on key expiration time 
>>>>>> and
>>>>>> I was wondering what potential issues I should watch out for this.
>>>>>>
>>>>>

Reply via email to