Hi,

Using a MapState is a workaround that should work but it would be nice if 
ListState would work for state that is too big to fit into memory.

Best,
Aljoscha

> On 13. Dec 2017, at 17:40, Jan Lukavský <je...@seznam.cz> wrote:
> 
> Hi Aljoscha,
> 
> thanks for reply. Do you see any issues in implementing the list state the 
> way Fabian suggested (i.e. using the MapState)? I feel there are some open 
> questions, mostly because the InternalListState (which I suppose the 
> RocksDBListState should implement) extends InternalKvState, which in turn 
> suggests, that the correct implementation *should* behave exactly as the 
> current implementation does - serialize the list into one field and store as 
> key-value. Do you think there would be any major issues with this?
> 
> Many thanks,
> 
>  Jan
> 
> 
> On 12/13/2017 02:22 PM, Aljoscha Krettek wrote:
>> Hi,
>> 
>> If I remember correctly, there was actually an effort to change the RocksDB 
>> list state the way you described. I'm cc'ing Stephan, who was involved in 
>> that and this is the Jira issue: 
>> https://issues.apache.org/jira/browse/FLINK-5756 
>> <https://issues.apache.org/jira/browse/FLINK-5756>
>> 
>> Best,
>> Aljoscha
>> 
>>> On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU 
>>> <ovidiu-cristian.ma...@inria.fr> wrote:
>>> 
>>> Hi Jan,
>>> 
>>> You could associate a key to each element of your Key's list (e.g., hashing 
>>> the value), keep only the keys in heap (e.g., in a list) and the associated 
>>> state key-value/s in an external store like RocksDB/Redis, but you will 
>>> notice large overheads due to de/serializing - a huge penatly for more than 
>>> 1000s of elements (see https://hal.inria.fr/hal-01530744/document 
>>> <https://hal.inria.fr/hal-01530744/document> for some experimental 
>>> settings) for relatively small rate of new events per Key, if needed to 
>>> process all values of a Key for each new event. Best case you can do some 
>>> incremental processing unless your non-combining means non-associative 
>>> operations per Key.
>>> 
>>> Best,
>>> Ovidiu
>>>> On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:
>>>> 
>>>> Hi Fabian,
>>>> 
>>>> thanks for quick reply, what you suggest seems to work at first sight, I 
>>>> will try it. Is there any reason not to implement a RocksDBListState this 
>>>> way in general? Is there any increased overhead of this approach?
>>>> 
>>>> Thanks,
>>>> 
>>>> Jan
>>>> 
>>>> 
>>>> On 12/12/2017 11:17 AM, Fabian Hueske wrote:
>>>>> Hi Jan,
>>>>> 
>>>>> I cannot comment on the internal design, but you could put the data into a
>>>>> RocksDBStateBackend MapState<Integer, X> where the value X is your data
>>>>> type and the key is the list index. You would need another ValueState for
>>>>> the current number of elements that you put into the MapState.
>>>>> A MapState allows to fetch and traverse the key, value, or entry set of 
>>>>> the
>>>>> Map without loading it completely into memory.
>>>>> The sets are traversed in sort order of the key, so should be in insertion
>>>>> order (given that you properly increment the list index).
>>>>> 
>>>>> Best, Fabian
>>>>> 
>>>>> 2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:
>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> I have a question that appears as a user@ question, but brought me into
>>>>>> the dev@ mailing list while I was browsing through the Flink's source
>>>>>> codes. First I'll try to briefly describe my use case. I'm trying to do a
>>>>>> group-by-key operation with a limited number of distinct keys (which I
>>>>>> cannot control), but a non trivial count of values. The operation in the
>>>>>> GBK is non-combining, so that all values per key (many) have to be stored
>>>>>> in a state. Running this on testing data led to a surprise (for me), that
>>>>>> even when using RocksDBStateBackend, the whole list of data is serialized
>>>>>> into single binary blob and then deserialized into List, and therefore 
>>>>>> has
>>>>>> to fit in memory (multiple times, in fact).
>>>>>> 
>>>>>> I tried to create an alternative RocksDBStateBackend, that would store
>>>>>> each element of list in ListState to a separate key in RocksDB, so that 
>>>>>> the
>>>>>> whole blob would not have to be loaded by a single get, but a scan over
>>>>>> multiple keys could be made. Digging into the source code I found there 
>>>>>> was
>>>>>> a hierarchy of classes mirroring the public API in 'internal' package -
>>>>>> InternalKvState, InternalMergingState, InternalListState, and so on. 
>>>>>> These
>>>>>> classes however have different hierarchy than the public API classes that
>>>>>> they mirror, most notably InternalKvState is superinterface of all 
>>>>>> others.
>>>>>> This fact seems to be used on multiple places throughout the source code.
>>>>>> 
>>>>>> My question is - is this intentional? Would it be possible to store each
>>>>>> element of a ListState in a separate key in RocksDB (probably by adding
>>>>>> some suffix to the actual key of the state for each element)? What are 
>>>>>> the
>>>>>> pitfalls? And is it necessary for the InternalListState to be actually
>>>>>> subinterface of InternalKvState? I find this to be a related problem.
>>>>>> 
>>>>>> Many thanks for any comments or thoughts,
>>>>>> 
>>>>>> Jan
>>>>>> 
>>>>>> 
>> 
> 

Reply via email to