I have a flink job that collects and aggregates time-series data from many devices into one object (let's call that X) that was collected by a window.
X contains time-series data, so it contains many String, Instant, a HashMap, and another type (Let's call Y) objects. When I collect 4 X instances, and it contains 800000 Y instances, that equates to approximately 172 MB of data. That should be okay, because my machine has 32 GB ram, and I allocated 1.5 GB to each task manager. However, it fails due to out of memory errors, and I think it happens during serialization. I am not sure if that's a coincidence or fact. I am using RocksDB state backend, as well Kryo serialization. I am already refactoring my code from Processing Time semantics to Event Time semantics, and I am trying to store smaller sized objects in keyed state, rather than this large object, but in the meantime, our machines have plenty of memory. What can I do to fix this? SAMPLE STACK TRACE 2020-12-17 02:45:55,524 WARN org.apache.flink.runtime.taskmanager.Task [] - enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING to FAILED. org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) [flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: java.lang.OutOfMemoryError: GC overhead limit exceeded ... 12 more Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded 2020-12-17 02:45:55,554 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code enrich information related to tag metadata to sensor time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d). 2020-12-17 02:45:38,981 WARN org.apache.flink.runtime.taskmanager.Task [] - aggregate daily average window function (2/2) (745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED. java.lang.OutOfMemoryError: GC overhead limit exceeded at sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown Source) ~[?:?] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_252] at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?] at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) ~[965ae4d.jar:?] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[965ae4d.jar:?] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) ~[965ae4d.jar:?] at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) ~[965ae4d.jar:?] at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.getInternal(AbstractRocksDBAppendingState.java:64) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:101) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$539/1319281920.runDefaultAction(Unknown Source) ~[?:?] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [965ae4d.jar:?] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [965ae4d.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]