Thanks a lot! On Tue, Dec 19, 2017 at 11:08 PM, Jan Lukavský <je...@seznam.cz> wrote:
> Hi, > > I filled a JIRA issue and pushed a PR for this. > > https://issues.apache.org/jira/browse/FLINK-8297 > > Best, > > Jan > > > On 12/14/2017 11:13 AM, Stephan Ewen wrote: > >> Hi Jan! >> >> One could implement the RocksDB ListState like you suggested. >> >> We did it the current way because that pattern is actually quite efficient >> if you list fits into memory - The list append is constant and the list >> access is the first time the values are concatenated. Especially for >> typical windowing patterns (frequent append(), occasional get()) this >> works >> quite well. >> >> It falls short when the lists get too large, that is correct. To break it >> into individual elements means to have a range iterator for list.get() >> access which I think is a bit more costly. It also needs a nifty way to >> add >> a 'position' number into the key to make sure the list remains ordered, >> and >> to not have to have extra read-modify-write state every time this number >> is >> updated. >> >> But all in all, it should be possible. Are you interested in working on >> something like that and contributing it? >> >> Best, >> Stephan >> >> >> On Wed, Dec 13, 2017 at 2:22 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >> Hi, >>> >>> If I remember correctly, there was actually an effort to change the >>> RocksDB list state the way you described. I'm cc'ing Stephan, who was >>> involved in that and this is the Jira issue: >>> https://issues.apache.org/jira/browse/FLINK-5756 < >>> https://issues.apache.org/jira/browse/FLINK-5756> >>> >>> Best, >>> Aljoscha >>> >>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU < >>>> >>> ovidiu-cristian.ma...@inria.fr> wrote: >>> >>>> Hi Jan, >>>> >>>> You could associate a key to each element of your Key's list (e.g., >>>> >>> hashing the value), keep only the keys in heap (e.g., in a list) and the >>> associated state key-value/s in an external store like RocksDB/Redis, but >>> you will notice large overheads due to de/serializing - a huge penatly >>> for >>> more than 1000s of elements (see https://hal.inria.fr/hal-01530 >>> 744/document <https://hal.inria.fr/hal-01530744/document> for some >>> experimental settings) for relatively small rate of new events per Key, >>> if >>> needed to process all values of a Key for each new event. Best case you >>> can >>> do some incremental processing unless your non-combining means >>> non-associative operations per Key. >>> >>>> Best, >>>> Ovidiu >>>> >>>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>> Hi Fabian, >>>>> >>>>> thanks for quick reply, what you suggest seems to work at first sight, >>>>> >>>> I will try it. Is there any reason not to implement a RocksDBListState >>> this >>> way in general? Is there any increased overhead of this approach? >>> >>>> Thanks, >>>>> >>>>> Jan >>>>> >>>>> >>>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote: >>>>> >>>>>> Hi Jan, >>>>>> >>>>>> I cannot comment on the internal design, but you could put the data >>>>>> >>>>> into a >>> >>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data >>>>>> type and the key is the list index. You would need another ValueState >>>>>> >>>>> for >>> >>>> the current number of elements that you put into the MapState. >>>>>> A MapState allows to fetch and traverse the key, value, or entry set >>>>>> >>>>> of the >>> >>>> Map without loading it completely into memory. >>>>>> The sets are traversed in sort order of the key, so should be in >>>>>> >>>>> insertion >>> >>>> order (given that you properly increment the list index). >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>: >>>>>> >>>>>> Hi all, >>>>>>> >>>>>>> I have a question that appears as a user@ question, but brought me >>>>>>> >>>>>> into >>> >>>> the dev@ mailing list while I was browsing through the Flink's source >>>>>>> codes. First I'll try to briefly describe my use case. I'm trying to >>>>>>> >>>>>> do a >>> >>>> group-by-key operation with a limited number of distinct keys (which I >>>>>>> cannot control), but a non trivial count of values. The operation in >>>>>>> >>>>>> the >>> >>>> GBK is non-combining, so that all values per key (many) have to be >>>>>>> >>>>>> stored >>> >>>> in a state. Running this on testing data led to a surprise (for me), >>>>>>> >>>>>> that >>> >>>> even when using RocksDBStateBackend, the whole list of data is >>>>>>> >>>>>> serialized >>> >>>> into single binary blob and then deserialized into List, and >>>>>>> >>>>>> therefore has >>> >>>> to fit in memory (multiple times, in fact). >>>>>>> >>>>>>> I tried to create an alternative RocksDBStateBackend, that would >>>>>>> store >>>>>>> each element of list in ListState to a separate key in RocksDB, so >>>>>>> >>>>>> that the >>> >>>> whole blob would not have to be loaded by a single get, but a scan >>>>>>> >>>>>> over >>> >>>> multiple keys could be made. Digging into the source code I found >>>>>>> >>>>>> there was >>> >>>> a hierarchy of classes mirroring the public API in 'internal' package >>>>>>> >>>>>> - >>> >>>> InternalKvState, InternalMergingState, InternalListState, and so on. >>>>>>> >>>>>> These >>> >>>> classes however have different hierarchy than the public API classes >>>>>>> >>>>>> that >>> >>>> they mirror, most notably InternalKvState is superinterface of all >>>>>>> >>>>>> others. >>> >>>> This fact seems to be used on multiple places throughout the source >>>>>>> >>>>>> code. >>> >>>> My question is - is this intentional? Would it be possible to store >>>>>>> >>>>>> each >>> >>>> element of a ListState in a separate key in RocksDB (probably by >>>>>>> >>>>>> adding >>> >>>> some suffix to the actual key of the state for each element)? What >>>>>>> >>>>>> are the >>> >>>> pitfalls? And is it necessary for the InternalListState to be actually >>>>>>> subinterface of InternalKvState? I find this to be a related problem. >>>>>>> >>>>>>> Many thanks for any comments or thoughts, >>>>>>> >>>>>>> Jan >>>>>>> >>>>>>> >>>>>>> >>> >