Hi Ori AFAIK, current the 2GB limit is still there. as a workaround, maybe you can reduce the state size. If this can not be done using the window operator, can the keyedprocessfunction[1] be ok for you?
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction Best, Congxian Ori Popowski <ori....@gmail.com> 于2020年7月8日周三 下午8:30写道: > I've asked this question in > https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive > for two years so I'm not sure it will be visible. > > While creating a savepoint I get a org.apache.flink.util.SerializedThrowable: > java.lang.NegativeArraySizeException. It's happening because some of my > windows have a keyed state of more than 2GiB, hitting RocksDB memory limit. > > How can I prevent this? > > As I understand it, I need somehow to limit the accumulated size of the > window I'm using, which is EventTimeWindow. However, I have no way of > doing so, because the WindowOperator manages its state on its own. > > Below is a full stack trace. > > org.apache.flink.util.SerializedThrowable: Could not materialize > checkpoint 139 for operator Window(EventTimeSessionWindows(1800000), > EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Flat Map -> Sink: > Unnamed (23/189). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.SerializedThrowable: > java.lang.NegativeArraySizeException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) > ... 3 common frames omitted > Caused by: org.apache.flink.util.SerializedThrowable: null > at org.rocksdb.RocksIterator.value0(Native Method) > at org.rocksdb.RocksIterator.value(RocksIterator.java:50) > at > org.apache.flink.contrib.streaming.state.RocksIteratorWrapper.value(RocksIteratorWrapper.java:102) > at > org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.value(RocksStatesPerKeyGroupMergeIterator.java:168) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:366) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221) > at > org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) > ... 5 common frames omitted >