I have a flink job that collects and aggregates time-series data from many
devices into one object (let's call that X) that was collected by a window.

X contains time-series data, so it contains many String, Instant, a
HashMap, and another type (Let's call Y) objects.

When I collect 4 X instances, and it contains 800000 Y instances, that
equates to approximately 172 MB of data.

That should be okay, because my machine has 32 GB ram, and I allocated 1.5
GB to each task manager.

However, it fails due to out of memory errors, and I think it happens
during serialization. I am not sure if that's a coincidence or fact.

I am using RocksDB state backend, as well Kryo serialization.

I am already refactoring my code from Processing Time semantics to Event
Time semantics, and I am trying to store smaller sized objects in keyed
state, rather than this large object, but in the meantime, our machines
have plenty of memory. What can I do to fix this?

SAMPLE STACK TRACE

2020-12-17 02:45:55,524 WARN  org.apache.flink.runtime.taskmanager.Task
               [] - enrich information related to tag metadata to sensor
time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING
to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception while processing timer.
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
java.lang.OutOfMemoryError: GC overhead limit exceeded
        ... 12 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
2020-12-17 02:45:55,554 INFO  org.apache.flink.runtime.taskmanager.Task
               [] - Triggering cancellation of task code enrich information
related to tag metadata to sensor time series (2/2)
(b6ba76c4cc6bc7fdbd2c69332442742d).
2020-12-17 02:45:38,981 WARN  org.apache.flink.runtime.taskmanager.Task
               [] - aggregate daily average window function (2/2)
(745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at
sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown
Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
~[?:1.8.0_252]
        at
org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
~[965ae4d.jar:?]
        at
org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
~[965ae4d.jar:?]
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
~[965ae4d.jar:?]
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[965ae4d.jar:?]
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
~[965ae4d.jar:?]
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[965ae4d.jar:?]
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
~[965ae4d.jar:?]
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
~[965ae4d.jar:?]
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[965ae4d.jar:?]
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
~[965ae4d.jar:?]
        at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
~[965ae4d.jar:?]
        at
org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.getInternal(AbstractRocksDBAppendingState.java:64)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:101)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$539/1319281920.runDefaultAction(Unknown
Source) ~[?:?]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[965ae4d.jar:?]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[965ae4d.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

Reply via email to