Great to hear that. Best, Congxian
Robin Cassan <robin.cas...@contentsquare.com> 于2020年5月20日周三 上午12:18写道: > Hi Yun and Congxian! > I have implemented a pre-filter that uses an keyed state ( > AggregatingState[Long]) that computes the size of all records seen for > each key, which lets me filter-out records that should be too big for the > RocksDB JNI bridge. This seems to make our job behave better! Thanks for > your help guys, this was really helpful :) > > Robin > > Le sam. 16 mai 2020 à 09:05, Congxian Qiu <qcx978132...@gmail.com> a > écrit : > >> Hi >> >> As you described, I'm not sure whether MapState can help you in such >> case. MapState will serializer each <mapKey, mapvalue> separately, so it >> would not encounter such the problem as ListState. >> >> When using MapState, you may need to handle how to set the mapKey, if the >> whole state will be cleared after processed, then you can use a monotonous >> increment integer as the mapKey, store the upper used mapKey in a value >> state. >> >> >> Best, >> Congxian >> >> >> Yun Tang <myas...@live.com> 于2020年5月15日周五 下午10:31写道: >> >>> Hi Robin >>> >>> I think you could record the size of you list under currentKey with >>> another value state or operator state (store a Map with <key-by key, list >>> length>, store the whole map in list when snapshotting). If you do not have >>> many key-by keys, operator state is a good choice as that is on-heap and >>> lightweight. >>> >>> Best >>> Yun Tang >>> ------------------------------ >>> *From:* Robin Cassan <robin.cas...@contentsquare.com> >>> *Sent:* Friday, May 15, 2020 20:59 >>> *To:* Yun Tang <myas...@live.com> >>> *Cc:* user <user@flink.apache.org> >>> *Subject:* Re: Protection against huge values in RocksDB List State >>> >>> Hi Yun, thanks for your answer! And sorry I didn't see this limitation >>> from the documentation, makes sense! >>> In our case, we are merging too many elements (since each element is >>> limited to 4Mib in our kafka topic). I agree we do not want our state to >>> contain really big values, this is why we are trying to find a way to put a >>> limit on the number (or total size) of elements that are aggregated in the >>> state of the window. >>> We have found a way to do this by using another sessionWindow that is >>> set before the other one, which will store the number of messages for each >>> key and reject new messages if we have reached a limit, but we are >>> wondering if there is a better way to achieve that without creating another >>> state. >>> >>> Thanks again, >>> Robin >>> >>> Le jeu. 14 mai 2020 à 19:38, Yun Tang <myas...@live.com> a écrit : >>> >>> Hi Robin >>> >>> First of all, the root cause is not RocksDB cannot store large list >>> state when you merge but the JNI limitation of 2^31 bytes [1]. >>> Moreover, RocksDB java would not return anything when you call merge [2] >>> operator. >>> >>> Did you merge too many elements or just merge too big-size elements? >>> Last but not least, even you could merge large list, I think getting a >>> value with size larger than 2^31 bytes should not behave well. >>> >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend >>> [2] >>> https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382 >>> >>> Best >>> Yun Tang >>> ------------------------------ >>> *From:* Robin Cassan <robin.cas...@contentsquare.com> >>> *Sent:* Friday, May 15, 2020 0:37 >>> *To:* user <user@flink.apache.org> >>> *Subject:* Protection against huge values in RocksDB List State >>> >>> Hi all! >>> >>> I cannot seem to find any setting to limit the number of records >>> appended in a RocksDBListState that is used when we use SessionWindows with >>> a ProcessFunction. >>> It seems that, for each incoming element, the new element will be >>> appended to the value with the RocksDB `merge` operator, without any >>> safeguard to make sure that it doesn't grow infinitely. RocksDB merge seems >>> to support returning false in case of error, so I guess we could implement >>> a limit by returning false in the merge operator, but since Flink seems to >>> use the "stringappendtest" merge operator ( >>> https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc >>> ), >>> we always return true no matter what. >>> >>> This is troublesome for us because it would make a lot of sense to >>> specify an acceptable limit to how many elements can be aggregated under a >>> given key, and because when we happen to have too many elements we get an >>> exception from RocksDB: >>> ``` >>> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while >>> retrieving data from RocksDB >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60) >>> at >>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501) >>> at >>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) >>> at >>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281) >>> ... 7 more >>> Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM >>> limit >>> at org.rocksdb.RocksDB.get(Native Method) >>> at org.rocksdb.RocksDB.get(RocksDB.java:810) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118) >>> ... 12 more >>> ``` >>> >>> We are currently bypassing this by using a Reduce operator instead, >>> which ensures that we only store one element per key, but this gives us >>> degraded performance. >>> >>> Thanks for your input! >>> Robin >>> >>>