Thanks Stefan. The logical data model of Map<EventKey, Map<UserKey,
Value>> makes total sense.  A related question, the MapState supports
iterate. What's the encoding format at the RocksDB layer? Or rather
how a user could control the user key encoding?

I assume the implementation uses a compound key format:
EventKeyUserKey.  Let's assume UserKey is an int or long. If using big
endian, the iterate will return UserKey in order as stored in the
RocksDB.

Thanks!


On Thu, Apr 27, 2017 at 6:34 AM, Stefan Richter
<s.rich...@data-artisans.com> wrote:
> Hi,
>
> you can imagine the internals of keyed map state working like a Map<EventKey, 
> Map<UserKey, Value>>, but you only deal with the Map<UserKey, Value> part in 
> your user code. Under the hood, Flink will always present you the map that 
> corresponds to the currently processed even’s key. So for each element, it 
> will always swap in the inner map, basically doing lookup to the outer map by 
> event key for you.
>
> For operator state (which is state that is not by key) there are currently no 
> map states and also no implementation for RocksDB. We might introduce this in 
> the future but until now, it was never really required because large state is 
> typically by key. So what you can do is just maintaining e.g.  a 
> java.util.Map<UserKey, Value> yourself and write it to a ListState at 
> checkpointing time. The list aspect in operator state is different from the 
> keyed ListState: list elements build the atoms of state re-distribution 
> (think you scaling in or out). So you could store your complete map as one 
> list element, or each entry as one list element, or anything in between - 
> depending on if and how your operator state can be re-sharded. You could take 
> a look at FlinkKafkaConsumerBase::initializeState and 
> FlinkKafkaConsumerBase::snapshotState as an example, where Kafka partition 
> offsets are the operator state and individual offsets become list elements so 
> that they can be individually redistributed.
>
> Best,
> Stefan
>
>
>> Am 26.04.2017 um 17:24 schrieb Sand Stone <sand.m.st...@gmail.com>:
>>
>> To be clear, I like the direction of Flink is going with State:
>> Querytable State, MapState etc. MapState in particular is a great
>> feature and I am trying to find more documentation and/or usage
>> patterns with it before I dive into the deep end of the code. As far
>> as I can tell, the key in MapState does not have to be associated with
>> the key in keyed stream. So in theory, I should be able to use
>> MapState almost anywhere that accepts "RichXXX" functions.
>>
>> Also, I wonder if it makes sense to have "global state" (stored in a
>> rocksdb backend) to be instantiated by Env and maintained by
>> JobManager. Sure the state access is RPC but the database lifetime is
>> maintained by the Flink cluster. Right now I think I could use a "long
>> running" job to expose a Queryable State to emulate this.
>>
>> Thanks!
>>
>>
>>
>> On Wed, Apr 26, 2017 at 8:01 AM, Timo Walther <twal...@apache.org> wrote:
>>> Hi,
>>>
>>> you are right. There are some limitation about RichReduceFunctions on
>>> windows. Maybe the new AggregateFunction `window.aggregate()` could solve
>>> your problem, you can provide an accumulator which is your custom state that
>>> you can update for each record. I couldn't find a documentation page, it
>>> might be created in next weeks after the feature freeze.
>>>
>>> Regarding the MapState I loop in Stefan, maybe he can give you some advice
>>> here.
>>>
>>> Timo
>>>
>>>
>>>
>>>
>>> Am 26/04/17 um 04:25 schrieb Sand Stone:
>>>
>>>> Hi, Flink newbie here.
>>>>
>>>> I played with the API (built from GitHub master), I encountered some
>>>> issues but I am not sure if they are limitations or actually by
>>>> design:
>>>>     1. the data stream reduce method does not take a
>>>> RichReduceFunction. The code compiles but throws runtime exception
>>>> when submitted. [My intent is to maintain a MapState, more below]
>>>>
>>>>      2. Flink seems to be picky on where the MapState is used at
>>>> runtime. MapState is restricted to keyed stream, and cannot be used
>>>> with certain operators. However I might need to maintain a MapState
>>>> for certain (persistent) keyed state for processing contexts. [I could
>>>> use an external kv store via async io API, but I am hoping Flink could
>>>> help to maintain the (rocksdb) db instances so I could avoid another
>>>> layer of external store].
>>>>
>>>> Any pointer to blog/doc/video is greatly appreciated.
>>>>
>>>> Thanks!
>>>
>>>
>>>
>

Reply via email to