Hi,
Eventually flatMapWithState solved the problem. I started by looking into
KeyedProcessFunction which lead me to flatMapWithState. It's working very
well.
.keyBy(…)
.flatMapWithState[Event, Int] { (event, countOpt) =>
val count = countOpt.getOrElse(0)
if (count < config.limit) (List(event)
> AFAIK, current the 2GB limit is still there. as a workaround, maybe you
can reduce the state size. If this can not be done using the window
operator, can the keyedprocessfunction[1] be ok for you?
I'll see if I can introduce it to the code.
> if you do, the ProcessWindowFunction is getting as a
Hi Ori,
In your code, are you using the process() API?
.process(new MyProcessWindowFunction());
if you do, the ProcessWindowFunction is getting as argument an Iterable
with ALL elements collected along the session. This will make the state per
key potentially huge (like you're experiencing).
As
Hi Ori
AFAIK, current the 2GB limit is still there. as a workaround, maybe you can
reduce the state size. If this can not be done using the window operator,
can the keyedprocessfunction[1] be ok for you?
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process
I've asked this question in https://issues.apache.org/jira/browse/FLINK-9268
but it's been inactive for two years so I'm not sure it will be visible.
While creating a savepoint I get a org.apache.flink.util.SerializedThrowable:
java.lang.NegativeArraySizeException. It's happening because some of m