Hi Yun, thanks for your answer! And sorry I didn't see this limitation from the documentation, makes sense! In our case, we are merging too many elements (since each element is limited to 4Mib in our kafka topic). I agree we do not want our state to contain really big values, this is why we are trying to find a way to put a limit on the number (or total size) of elements that are aggregated in the state of the window. We have found a way to do this by using another sessionWindow that is set before the other one, which will store the number of messages for each key and reject new messages if we have reached a limit, but we are wondering if there is a better way to achieve that without creating another state.
Thanks again, Robin Le jeu. 14 mai 2020 à 19:38, Yun Tang <myas...@live.com> a écrit : > Hi Robin > > First of all, the root cause is not RocksDB cannot store large list state > when you merge but the JNI limitation of 2^31 bytes [1]. > Moreover, RocksDB java would not return anything when you call merge [2] > operator. > > Did you merge too many elements or just merge too big-size elements? Last > but not least, even you could merge large list, I think getting a value > with size larger than 2^31 bytes should not behave well. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend > [2] > https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382 > > Best > Yun Tang > ------------------------------ > *From:* Robin Cassan <robin.cas...@contentsquare.com> > *Sent:* Friday, May 15, 2020 0:37 > *To:* user <user@flink.apache.org> > *Subject:* Protection against huge values in RocksDB List State > > Hi all! > > I cannot seem to find any setting to limit the number of records appended > in a RocksDBListState that is used when we use SessionWindows with a > ProcessFunction. > It seems that, for each incoming element, the new element will be appended > to the value with the RocksDB `merge` operator, without any safeguard to > make sure that it doesn't grow infinitely. RocksDB merge seems to support > returning false in case of error, so I guess we could implement a limit by > returning false in the merge operator, but since Flink seems to use the > "stringappendtest" merge operator ( > https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc > ), > we always return true no matter what. > > This is troublesome for us because it would make a lot of sense to specify > an acceptable limit to how many elements can be aggregated under a given > key, and because when we happen to have too many elements we get an > exception from RocksDB: > ``` > Caused by: org.apache.flink.util.FlinkRuntimeException: Error while > retrieving data from RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) > ... 7 more > Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM > limit > at org.rocksdb.RocksDB.get(Native Method) > at org.rocksdb.RocksDB.get(RocksDB.java:810) > at > org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) > ... 12 more > ``` > > We are currently bypassing this by using a Reduce operator instead, which > ensures that we only store one element per key, but this gives us degraded > performance. > > Thanks for your input! > Robin >