Thanks Stefan. The logical data model of Map<EventKey, Map<UserKey, Value>> makes total sense. A related question, the MapState supports iterate. What's the encoding format at the RocksDB layer? Or rather how a user could control the user key encoding?
I assume the implementation uses a compound key format: EventKeyUserKey. Let's assume UserKey is an int or long. If using big endian, the iterate will return UserKey in order as stored in the RocksDB. Thanks! On Thu, Apr 27, 2017 at 6:34 AM, Stefan Richter <s.rich...@data-artisans.com> wrote: > Hi, > > you can imagine the internals of keyed map state working like a Map<EventKey, > Map<UserKey, Value>>, but you only deal with the Map<UserKey, Value> part in > your user code. Under the hood, Flink will always present you the map that > corresponds to the currently processed even’s key. So for each element, it > will always swap in the inner map, basically doing lookup to the outer map by > event key for you. > > For operator state (which is state that is not by key) there are currently no > map states and also no implementation for RocksDB. We might introduce this in > the future but until now, it was never really required because large state is > typically by key. So what you can do is just maintaining e.g. a > java.util.Map<UserKey, Value> yourself and write it to a ListState at > checkpointing time. The list aspect in operator state is different from the > keyed ListState: list elements build the atoms of state re-distribution > (think you scaling in or out). So you could store your complete map as one > list element, or each entry as one list element, or anything in between - > depending on if and how your operator state can be re-sharded. You could take > a look at FlinkKafkaConsumerBase::initializeState and > FlinkKafkaConsumerBase::snapshotState as an example, where Kafka partition > offsets are the operator state and individual offsets become list elements so > that they can be individually redistributed. > > Best, > Stefan > > >> Am 26.04.2017 um 17:24 schrieb Sand Stone <sand.m.st...@gmail.com>: >> >> To be clear, I like the direction of Flink is going with State: >> Querytable State, MapState etc. MapState in particular is a great >> feature and I am trying to find more documentation and/or usage >> patterns with it before I dive into the deep end of the code. As far >> as I can tell, the key in MapState does not have to be associated with >> the key in keyed stream. So in theory, I should be able to use >> MapState almost anywhere that accepts "RichXXX" functions. >> >> Also, I wonder if it makes sense to have "global state" (stored in a >> rocksdb backend) to be instantiated by Env and maintained by >> JobManager. Sure the state access is RPC but the database lifetime is >> maintained by the Flink cluster. Right now I think I could use a "long >> running" job to expose a Queryable State to emulate this. >> >> Thanks! >> >> >> >> On Wed, Apr 26, 2017 at 8:01 AM, Timo Walther <twal...@apache.org> wrote: >>> Hi, >>> >>> you are right. There are some limitation about RichReduceFunctions on >>> windows. Maybe the new AggregateFunction `window.aggregate()` could solve >>> your problem, you can provide an accumulator which is your custom state that >>> you can update for each record. I couldn't find a documentation page, it >>> might be created in next weeks after the feature freeze. >>> >>> Regarding the MapState I loop in Stefan, maybe he can give you some advice >>> here. >>> >>> Timo >>> >>> >>> >>> >>> Am 26/04/17 um 04:25 schrieb Sand Stone: >>> >>>> Hi, Flink newbie here. >>>> >>>> I played with the API (built from GitHub master), I encountered some >>>> issues but I am not sure if they are limitations or actually by >>>> design: >>>> 1. the data stream reduce method does not take a >>>> RichReduceFunction. The code compiles but throws runtime exception >>>> when submitted. [My intent is to maintain a MapState, more below] >>>> >>>> 2. Flink seems to be picky on where the MapState is used at >>>> runtime. MapState is restricted to keyed stream, and cannot be used >>>> with certain operators. However I might need to maintain a MapState >>>> for certain (persistent) keyed state for processing contexts. [I could >>>> use an external kv store via async io API, but I am hoping Flink could >>>> help to maintain the (rocksdb) db instances so I could avoid another >>>> layer of external store]. >>>> >>>> Any pointer to blog/doc/video is greatly appreciated. >>>> >>>> Thanks! >>> >>> >>> >