Hi Marco,
Could you share the preconfiguration logs? They are printed in the
beginning of the taskmanagers' logs and contain a summary of the used
memory configuration?

Best,
Matthias

On Tue, Jan 26, 2021 at 6:35 AM Marco Villalobos <mvillalo...@kineteque.com>
wrote:

>
> I have a flink job that collects and aggregates time-series data from many
> devices into one object (let's call that X) that was collected by a window.
>
> X contains time-series data, so it contains many String, Instant, a
> HashMap, and another type (Let's call Y) objects.
>
> When I collect 4 X instances, and it contains 800000 Y instances, that
> equates to approximately 172 MB of data.
>
> That should be okay, because my machine has 32 GB ram, and I allocated 1.5
> GB to each task manager.
>
> However, it fails due to out of memory errors, and I think it happens
> during serialization. I am not sure if that's a coincidence or fact.
>
> I am using RocksDB state backend, as well Kryo serialization.
>
> I am already refactoring my code from Processing Time semantics to Event
> Time semantics, and I am trying to store smaller sized objects in keyed
> state, rather than this large object, but in the meantime, our machines
> have plenty of memory. What can I do to fix this?
>
> SAMPLE STACK TRACE
>
> 2020-12-17 02:45:55,524 WARN  org.apache.flink.runtime.taskmanager.Task
>                  [] - enrich information related to tag metadata to sensor
> time series (2/2) (b6ba76c4cc6bc7fdbd2c69332442742d) switched from RUNNING
> to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
> exception while processing timer.
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1088)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1062)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1183)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:270)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> [flink-dist_2.12-1.11.0.jar:1.11.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [965ae4d.jar:?]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [965ae4d.jar:?]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>         ... 12 more
> Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
> 2020-12-17 02:45:55,554 INFO  org.apache.flink.runtime.taskmanager.Task
>                  [] - Triggering cancellation of task code enrich
> information related to tag metadata to sensor time series (2/2)
> (b6ba76c4cc6bc7fdbd2c69332442742d).
> 2020-12-17 02:45:38,981 WARN  org.apache.flink.runtime.taskmanager.Task
>                  [] - aggregate daily average window function (2/2)
> (745ff4669f9c1812de5b717c87a36a26) switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>         at
> sun.reflect.GeneratedSerializationConstructorAccessor230.newInstance(Unknown
> Source) ~[?:?]
>         at
> java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> ~[?:1.8.0_252]
>         at
> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
> ~[965ae4d.jar:?]
>         at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> ~[965ae4d.jar:?]
>         at
> org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:45)
> ~[965ae4d.jar:?]
>         at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> ~[965ae4d.jar:?]
>         at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[965ae4d.jar:?]
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> ~[965ae4d.jar:?]
>         at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:411)
> ~[965ae4d.jar:?]
>         at
> org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.getInternal(AbstractRocksDBAppendingState.java:64)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.contrib.streaming.state.RocksDBAggregatingState.add(RocksDBAggregatingState.java:101)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$539/1319281920.runDefaultAction(Unknown
> Source) ~[?:?]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [965ae4d.jar:?]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [965ae4d.jar:?]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
>
>

Reply via email to