https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/
Backbutton.co.uk ¯\_(ツ)_/¯ ♡۶Java♡۶RMI ♡۶ Make Use Method {MUM} makeuse.org <http://www.backbutton.co.uk> On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard < lassenedergaardfl...@gmail.com> wrote: > Hi. > > We have migrated to Flink 1.10 and face out of memory exception and > hopeful can someone point us in the right direction. > > We have a job that use broadcast state, and we sometimes get out memory > when it creates a savepoint. See stacktrack below. > We have assigned 2.2 GB/task manager and > configured taskmanager.memory.process.size : 2200m > In Flink 1.9 our container was terminated because OOM, so 1.10 do a better > job, but it still not working and the task manager is leaking mem for each > OOM and finial kill by Mesos > > > Any idea what we can do to figure out what settings we need to change? > > Thanks in advance > > Lasse Nedergaard > > > WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could > not close the state stream for > s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa. > java.io.IOException: Cannot allocate memory at > java.io.FileOutputStream.writeBytes(Native Method) at > java.io.FileOutputStream.write(FileOutputStream.java:326) at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at > java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at > java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at > java.io.FilterOutputStream.close(FilterOutputStream.java:158) at > com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995) > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) > at > org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) > at > org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at > org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c. > org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint > 3509 for operator Feature extraction (8/12). at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot > allocate memory at > java.util.concurrent.FutureTask.report(FutureTask.java:122) at > java.util.concurrent.FutureTask.get(FutureTask.java:192) at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143) > ... 3 common frames omitted Caused by: > org.apache.flink.util.SerializedThrowable: Cannot allocate memory at > java.io.FileOutputStream.writeBytes(Native Method) at > java.io.FileOutputStream.write(FileOutputStream.java:326) at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at > java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at > java.io.FilterOutputStream.write(FilterOutputStream.java:77) at > java.io.FilterOutputStream.write(FilterOutputStream.java:125) at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57) > at java.io.DataOutputStream.write(DataOutputStream.java:107) at > org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47) > at > org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220) > at java.io.DataOutputStream.write(DataOutputStream.java:107) at > org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92) > at > org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113) > at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at > org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323) > at > org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281) > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) > at > org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) > at > org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > at > org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144) > at > org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98) > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) > at > org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83) > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) > at > org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185) > at > org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458) >