Hi Robin

I think you could record the size of you list under currentKey with another 
value state or operator state (store a Map with <key-by key, list length>, 
store the whole map in list when snapshotting). If you do not have many key-by 
keys, operator state is a good choice as that is on-heap and lightweight.

Best
Yun Tang
________________________________
From: Robin Cassan <robin.cas...@contentsquare.com>
Sent: Friday, May 15, 2020 20:59
To: Yun Tang <myas...@live.com>
Cc: user <user@flink.apache.org>
Subject: Re: Protection against huge values in RocksDB List State

Hi Yun, thanks for your answer! And sorry I didn't see this limitation from the 
documentation, makes sense!
In our case, we are merging too many elements (since each element is limited to 
4Mib in our kafka topic). I agree we do not want our state to contain really 
big values, this is why we are trying to find a way to put a limit on the 
number (or total size) of elements that are aggregated in the state of the 
window.
We have found a way to do this by using another sessionWindow that is set 
before the other one, which will store the number of messages for each key and 
reject new messages if we have reached a limit, but we are wondering if there 
is a better way to achieve that without creating another state.

Thanks again,
Robin

Le jeu. 14 mai 2020 à 19:38, Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> a écrit :
Hi Robin

First of all, the root cause is not RocksDB cannot store large list state when 
you merge but the JNI limitation of 2^31 bytes [1].
Moreover, RocksDB java would not return anything when you call merge [2] 
operator.

Did you merge too many elements or just merge too big-size elements? Last but 
not least, even you could merge large list, I think getting a value with size 
larger than 2^31 bytes should not behave well.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
[2] 
https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382

Best
Yun Tang
________________________________
From: Robin Cassan 
<robin.cas...@contentsquare.com<mailto:robin.cas...@contentsquare.com>>
Sent: Friday, May 15, 2020 0:37
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Protection against huge values in RocksDB List State

Hi all!

I cannot seem to find any setting to limit the number of records appended in a 
RocksDBListState that is used when we use SessionWindows with a ProcessFunction.
It seems that, for each incoming element, the new element will be appended to 
the value with the RocksDB `merge` operator, without any safeguard to make sure 
that it doesn't grow infinitely. RocksDB merge seems to support returning false 
in case of error, so I guess we could implement a limit by returning false in 
the merge operator, but since Flink seems to use the "stringappendtest" merge 
operator ( 
https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
 ), we always return true no matter what.

This is troublesome for us because it would make a lot of sense to specify an 
acceptable limit to how many elements can be aggregated under a given key, and 
because when we happen to have too many elements we get an exception from 
RocksDB:
```
Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving 
data from RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 more
Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM limit
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 12 more
```

We are currently bypassing this by using a Reduce operator instead, which 
ensures that we only store one element per key, but this gives us degraded 
performance.

Thanks for your input!
Robin

Reply via email to