Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-13 Thread Ori Popowski
Hi, Eventually flatMapWithState solved the problem. I started by looking into KeyedProcessFunction which lead me to flatMapWithState. It's working very well. .keyBy(…) .flatMapWithState[Event, Int] { (event, countOpt) => val count = countOpt.getOrElse(0) if (count < config.limit) (List(event)

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-12 Thread Ori Popowski
> AFAIK, current the 2GB limit is still there. as a workaround, maybe you can reduce the state size. If this can not be done using the window operator, can the keyedprocessfunction[1] be ok for you? I'll see if I can introduce it to the code. > if you do, the ProcessWindowFunction is getting as a

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-11 Thread Rafi Aroch
Hi Ori, In your code, are you using the process() API? .process(new MyProcessWindowFunction()); if you do, the ProcessWindowFunction is getting as argument an Iterable with ALL elements collected along the session. This will make the state per key potentially huge (like you're experiencing). As

Re: Savepoint fails due to RocksDB 2GiB limit

2020-07-11 Thread Congxian Qiu
Hi Ori AFAIK, current the 2GB limit is still there. as a workaround, maybe you can reduce the state size. If this can not be done using the window operator, can the keyedprocessfunction[1] be ok for you? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process

Savepoint fails due to RocksDB 2GiB limit

2020-07-08 Thread Ori Popowski
I've asked this question in https://issues.apache.org/jira/browse/FLINK-9268 but it's been inactive for two years so I'm not sure it will be visible. While creating a savepoint I get a org.apache.flink.util.SerializedThrowable: java.lang.NegativeArraySizeException. It's happening because some of m