Hi all! I cannot seem to find any setting to limit the number of records appended in a RocksDBListState that is used when we use SessionWindows with a ProcessFunction. It seems that, for each incoming element, the new element will be appended to the value with the RocksDB `merge` operator, without any safeguard to make sure that it doesn't grow infinitely. RocksDB merge seems to support returning false in case of error, so I guess we could implement a limit by returning false in the merge operator, but since Flink seems to use the "stringappendtest" merge operator ( https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc ), we always return true no matter what.
This is troublesome for us because it would make a lot of sense to specify an acceptable limit to how many elements can be aggregated under a given key, and because when we happen to have too many elements we get an exception from RocksDB: ``` Caused by: org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from RocksDB at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) ... 7 more Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM limit at org.rocksdb.RocksDB.get(Native Method) at org.rocksdb.RocksDB.get(RocksDB.java:810) at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) ... 12 more ``` We are currently bypassing this by using a Reduce operator instead, which ensures that we only store one element per key, but this gives us degraded performance. Thanks for your input! Robin