Hi Stephen,

yes, definitely. I have put together a POC implementation that seems to work for my use-case (not yet tested for performance, though). A have put together a PR, just for discussion of the topic, here:

https://github.com/datadrivencz/flink/pull/1/

I know, that the PR doesn't follow the guidelines for submitting PRs, but I consider it still a WIP and its purpose is just to agree upon the implementation details. Would you find a few moments to walk it through?

Or should I file a JIRA and have the discussion there? No problem with that, I just thought that discussion directly with the code could be more productive in this case.

Thanks,

 Jan


On 12/14/2017 11:13 AM, Stephan Ewen wrote:
Hi Jan!

One could implement the RocksDB ListState like you suggested.

We did it the current way because that pattern is actually quite efficient
if you list fits into memory - The list append is constant and the list
access is the first time the values are concatenated. Especially for
typical windowing patterns (frequent append(), occasional get()) this works
quite well.

It falls short when the lists get too large, that is correct. To break it
into individual elements means to have a range iterator for list.get()
access which I think is a bit more costly. It also needs a nifty way to add
a 'position' number into the key to make sure the list remains ordered, and
to not have to have extra read-modify-write state every time this number is
updated.

But all in all, it should be possible. Are you interested in working on
something like that and contributing it?

Best,
Stephan


On Wed, Dec 13, 2017 at 2:22 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

Hi,

If I remember correctly, there was actually an effort to change the
RocksDB list state the way you described. I'm cc'ing Stephan, who was
involved in that and this is the Jira issue:
https://issues.apache.org/jira/browse/FLINK-5756 <
https://issues.apache.org/jira/browse/FLINK-5756>

Best,
Aljoscha

On 12. Dec 2017, at 14:47, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:
Hi Jan,

You could associate a key to each element of your Key's list (e.g.,
hashing the value), keep only the keys in heap (e.g., in a list) and the
associated state key-value/s in an external store like RocksDB/Redis, but
you will notice large overheads due to de/serializing - a huge penatly for
more than 1000s of elements (see https://hal.inria.fr/hal-01530
744/document <https://hal.inria.fr/hal-01530744/document> for some
experimental settings) for relatively small rate of new events per Key, if
needed to process all values of a Key for each new event. Best case you can
do some incremental processing unless your non-combining means
non-associative operations per Key.
Best,
Ovidiu
On 12 Dec 2017, at 11:54, Jan Lukavský <je...@seznam.cz> wrote:

Hi Fabian,

thanks for quick reply, what you suggest seems to work at first sight,
I will try it. Is there any reason not to implement a RocksDBListState this
way in general? Is there any increased overhead of this approach?
Thanks,

Jan


On 12/12/2017 11:17 AM, Fabian Hueske wrote:
Hi Jan,

I cannot comment on the internal design, but you could put the data
into a
RocksDBStateBackend MapState<Integer, X> where the value X is your data
type and the key is the list index. You would need another ValueState
for
the current number of elements that you put into the MapState.
A MapState allows to fetch and traverse the key, value, or entry set
of the
Map without loading it completely into memory.
The sets are traversed in sort order of the key, so should be in
insertion
order (given that you properly increment the list index).

Best, Fabian

2017-12-12 10:23 GMT+01:00 Jan Lukavský <je...@seznam.cz>:

Hi all,

I have a question that appears as a user@ question, but brought me
into
the dev@ mailing list while I was browsing through the Flink's source
codes. First I'll try to briefly describe my use case. I'm trying to
do a
group-by-key operation with a limited number of distinct keys (which I
cannot control), but a non trivial count of values. The operation in
the
GBK is non-combining, so that all values per key (many) have to be
stored
in a state. Running this on testing data led to a surprise (for me),
that
even when using RocksDBStateBackend, the whole list of data is
serialized
into single binary blob and then deserialized into List, and
therefore has
to fit in memory (multiple times, in fact).

I tried to create an alternative RocksDBStateBackend, that would store
each element of list in ListState to a separate key in RocksDB, so
that the
whole blob would not have to be loaded by a single get, but a scan
over
multiple keys could be made. Digging into the source code I found
there was
a hierarchy of classes mirroring the public API in 'internal' package
-
InternalKvState, InternalMergingState, InternalListState, and so on.
These
classes however have different hierarchy than the public API classes
that
they mirror, most notably InternalKvState is superinterface of all
others.
This fact seems to be used on multiple places throughout the source
code.
My question is - is this intentional? Would it be possible to store
each
element of a ListState in a separate key in RocksDB (probably by
adding
some suffix to the actual key of the state for each element)? What
are the
pitfalls? And is it necessary for the InternalListState to be actually
subinterface of InternalKvState? I find this to be a related problem.

Many thanks for any comments or thoughts,

Jan




Reply via email to