Hi Robin I think you could record the size of you list under currentKey with another value state or operator state (store a Map with <key-by key, list length>, store the whole map in list when snapshotting). If you do not have many key-by keys, operator state is a good choice as that is on-heap and lightweight.
Best Yun Tang ________________________________ From: Robin Cassan <robin.cas...@contentsquare.com> Sent: Friday, May 15, 2020 20:59 To: Yun Tang <myas...@live.com> Cc: user <user@flink.apache.org> Subject: Re: Protection against huge values in RocksDB List State Hi Yun, thanks for your answer! And sorry I didn't see this limitation from the documentation, makes sense! In our case, we are merging too many elements (since each element is limited to 4Mib in our kafka topic). I agree we do not want our state to contain really big values, this is why we are trying to find a way to put a limit on the number (or total size) of elements that are aggregated in the state of the window. We have found a way to do this by using another sessionWindow that is set before the other one, which will store the number of messages for each key and reject new messages if we have reached a limit, but we are wondering if there is a better way to achieve that without creating another state. Thanks again, Robin Le jeu. 14 mai 2020 à 19:38, Yun Tang <myas...@live.com<mailto:myas...@live.com>> a écrit : Hi Robin First of all, the root cause is not RocksDB cannot store large list state when you merge but the JNI limitation of 2^31 bytes [1]. Moreover, RocksDB java would not return anything when you call merge [2] operator. Did you merge too many elements or just merge too big-size elements? Last but not least, even you could merge large list, I think getting a value with size larger than 2^31 bytes should not behave well. [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend [2] https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382 Best Yun Tang ________________________________ From: Robin Cassan <robin.cas...@contentsquare.com<mailto:robin.cas...@contentsquare.com>> Sent: Friday, May 15, 2020 0:37 To: user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Protection against huge values in RocksDB List State Hi all! I cannot seem to find any setting to limit the number of records appended in a RocksDBListState that is used when we use SessionWindows with a ProcessFunction. It seems that, for each incoming element, the new element will be appended to the value with the RocksDB `merge` operator, without any safeguard to make sure that it doesn't grow infinitely. RocksDB merge seems to support returning false in case of error, so I guess we could implement a limit by returning false in the merge operator, but since Flink seems to use the "stringappendtest" merge operator ( https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc ), we always return true no matter what. This is troublesome for us because it would make a lot of sense to specify an acceptable limit to how many elements can be aggregated under a given key, and because when we happen to have too many elements we get an exception from RocksDB: ``` Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ... 7 more Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM limit at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:810) at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) ... 12 more ``` We are currently bypassing this by using a Reduce operator instead, which ensures that we only store one element per key, but this gives us degraded performance. Thanks for your input! Robin