Hi Marco, Could you share the preconfiguration logs? They are printed in the beginning of the taskmanagers' logs and contain a summary of the used memory configuration?
Best, Matthias On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <mvillalo...@kineteque.com> wrote: > > I have a flink job that collects and aggregates time-series data from many > devices into one object (let's call that X) that was collected by a window. > > X contains time-series data, so it contains many String, Instant, a > HashMap, and another type (Let's call Y) objects. > > When I collect 4 X instances, and it contains 800000 Y instances, that > equates to approximately 172 MB of data. > > That should be okay, because my machine has 32 GB ram, and I allocated 1.5 > GB to each task manager. > > However, it fails due to out of memory errors, and I think it happens > during serialization. I am not sure if that's a coincidence or fact. > > I am using RocksDB state backend, as well Kryo serialization. > > I am already refactoring my code from Processing Time semantics to Event > Time semantics, and I am trying to store smaller sized objects in keyed > state, rather than this large object, but in the meantime, our machines > have plenty of memory. What can I do to fix this? > > SAMPLE STACK TRACE > > 2020-12-17 02:45:55,524 WARN org.apache.flink.runtime.taskmanager.Task > [] - enrich information related to tag metadata to sensor > time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING > to FAILED. > org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught > exception while processing timer. > at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > [flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) > [flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270) > [flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190) > [flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > [flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > [flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > [flink-dist_2.12-1.11.0.jar:1.11.0] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [965ae4d.jar:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [965ae4d.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] > Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: > java.lang.OutOfMemoryError: GC overhead limit exceeded > ... 12 more > Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded > 2020-12-17 02:45:55,554 INFO org.apache.flink.runtime.taskmanager.Task > [] - Triggering cancellation of task code enrich > information related to tag metadata to sensor time series (2/2) > (b6ba76c4cc6bc7fdbd2c69332442742d). > 2020-12-17 02:45:38,981 WARN org.apache.flink.runtime.taskmanager.Task > [] - aggregate daily average window function (2/2) > (745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED. > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown > Source) ~[?:?] > at > java.lang.reflect.Constructor.newInstance(Constructor.java:423) > ~[?:1.8.0_252] > at > org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) > ~[965ae4d.jar:?] > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) > ~[965ae4d.jar:?] > at > org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45) > ~[965ae4d.jar:?] > at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > ~[965ae4d.jar:?] > at > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > ~[965ae4d.jar:?] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) > ~[965ae4d.jar:?] > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411) > ~[965ae4d.jar:?] > at > org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.getInternal(AbstractRocksDBAppendingState.java:64) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:101) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$539/1319281920.runDefaultAction(Unknown > Source) ~[?:?] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > ~[flink-dist_2.12-1.11.0.jar:1.11.0] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [965ae4d.jar:?] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [965ae4d.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] > >