Hi, Using a MapState is a workaround that should work but it would be nice if ListState would work for state that is too big to fit into memory.
Best, Aljoscha > On 13. Dec 2017, at 17:40, Jan Lukavský <je...@seznam.cz> wrote: > > Hi Aljoscha, > > thanks for reply. Do you see any issues in implementing the list state the > way Fabian suggested (i.e. using the MapState)? I feel there are some open > questions, mostly because the InternalListState (which I suppose the > RocksDBListState should implement) extends InternalKvState, which in turn > suggests, that the correct implementation *should* behave exactly as the > current implementation does - serialize the list into one field and store as > key-value. Do you think there would be any major issues with this? > > Many thanks, > > Jan > > > On 12/13/2017 02:22 PM, Aljoscha Krettek wrote: >> Hi, >> >> If I remember correctly, there was actually an effort to change the RocksDB >> list state the way you described. I'm cc'ing Stephan, who was involved in >> that and this is the Jira issue: >> https://issues.apache.org/jira/browse/FLINK-5756 >> <https://issues.apache.org/jira/browse/FLINK-5756> >> >> Best, >> Aljoscha >> >>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU >>> <ovidiu-cristian.ma...@inria.fr> wrote: >>> >>> Hi Jan, >>> >>> You could associate a key to each element of your Key's list (e.g., hashing >>> the value), keep only the keys in heap (e.g., in a list) and the associated >>> state key-value/s in an external store like RocksDB/Redis, but you will >>> notice large overheads due to de/serializing - a huge penatly for more than >>> 1000s of elements (see https://hal.inria.fr/hal-01530744/document >>> <https://hal.inria.fr/hal-01530744/document> for some experimental >>> settings) for relatively small rate of new events per Key, if needed to >>> process all values of a Key for each new event. Best case you can do some >>> incremental processing unless your non-combining means non-associative >>> operations per Key. >>> >>> Best, >>> Ovidiu >>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>> Hi Fabian, >>>> >>>> thanks for quick reply, what you suggest seems to work at first sight, I >>>> will try it. Is there any reason not to implement a RocksDBListState this >>>> way in general? Is there any increased overhead of this approach? >>>> >>>> Thanks, >>>> >>>> Jan >>>> >>>> >>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote: >>>>> Hi Jan, >>>>> >>>>> I cannot comment on the internal design, but you could put the data into a >>>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data >>>>> type and the key is the list index. You would need another ValueState for >>>>> the current number of elements that you put into the MapState. >>>>> A MapState allows to fetch and traverse the key, value, or entry set of >>>>> the >>>>> Map without loading it completely into memory. >>>>> The sets are traversed in sort order of the key, so should be in insertion >>>>> order (given that you properly increment the list index). >>>>> >>>>> Best, Fabian >>>>> >>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I have a question that appears as a user@ question, but brought me into >>>>>> the dev@ mailing list while I was browsing through the Flink's source >>>>>> codes. First I'll try to briefly describe my use case. I'm trying to do a >>>>>> group-by-key operation with a limited number of distinct keys (which I >>>>>> cannot control), but a non trivial count of values. The operation in the >>>>>> GBK is non-combining, so that all values per key (many) have to be stored >>>>>> in a state. Running this on testing data led to a surprise (for me), that >>>>>> even when using RocksDBStateBackend, the whole list of data is serialized >>>>>> into single binary blob and then deserialized into List, and therefore >>>>>> has >>>>>> to fit in memory (multiple times, in fact). >>>>>> >>>>>> I tried to create an alternative RocksDBStateBackend, that would store >>>>>> each element of list in ListState to a separate key in RocksDB, so that >>>>>> the >>>>>> whole blob would not have to be loaded by a single get, but a scan over >>>>>> multiple keys could be made. Digging into the source code I found there >>>>>> was >>>>>> a hierarchy of classes mirroring the public API in 'internal' package - >>>>>> InternalKvState, InternalMergingState, InternalListState, and so on. >>>>>> These >>>>>> classes however have different hierarchy than the public API classes that >>>>>> they mirror, most notably InternalKvState is superinterface of all >>>>>> others. >>>>>> This fact seems to be used on multiple places throughout the source code. >>>>>> >>>>>> My question is - is this intentional? Would it be possible to store each >>>>>> element of a ListState in a separate key in RocksDB (probably by adding >>>>>> some suffix to the actual key of the state for each element)? What are >>>>>> the >>>>>> pitfalls? And is it necessary for the InternalListState to be actually >>>>>> subinterface of InternalKvState? I find this to be a related problem. >>>>>> >>>>>> Many thanks for any comments or thoughts, >>>>>> >>>>>> Jan >>>>>> >>>>>> >> >