Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful
can someone point us in the right direction.

We have a job that use broadcast state, and we sometimes get out memory
when it creates a savepoint. See stacktrack below.
We have assigned 2.2 GB/task manager and
configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better
job, but it still not working and the task manager is leaking mem for each
OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
not close the state stream for
s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
java.io.IOException: Cannot allocate memory at
java.io.FileOutputStream.writeBytes(Native Method) at
java.io.FileOutputStream.write(FileOutputStream.java:326) at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at
java.io.FilterOutputStream.close(FilterOutputStream.java:158) at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
at
org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277)
at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at
org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at
org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c.
org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint
3509 for operator Feature extraction (8/12). at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot
allocate memory at
java.util.concurrent.FutureTask.report(FutureTask.java:122) at
java.util.concurrent.FutureTask.get(FutureTask.java:192) at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
... 3 common frames omitted Caused by:
org.apache.flink.util.SerializedThrowable: Cannot allocate memory at
java.io.FileOutputStream.writeBytes(Native Method) at
java.io.FileOutputStream.write(FileOutputStream.java:326) at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at
java.io.FilterOutputStream.write(FilterOutputStream.java:77) at
java.io.FilterOutputStream.write(FilterOutputStream.java:125) at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
at java.io.DataOutputStream.write(DataOutputStream.java:107) at
org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47)
at
org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:220)
at java.io.DataOutputStream.write(DataOutputStream.java:107) at
org.apache.flink.formats.avro.utils.DataOutputEncoder.writeBytes(DataOutputEncoder.java:92)
at
org.apache.flink.formats.avro.utils.DataOutputEncoder.writeString(DataOutputEncoder.java:113)
at org.apache.avro.io.Encoder.writeString(Encoder.java:130) at
org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:323)
at
org.apache.avro.generic.GenericDatumWriter.writeMap(GenericDatumWriter.java:281)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
at
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:144)
at
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185)
at
org.apache.flink.runtime.state.HeapBroadcastState.write(HeapBroadcastState.java:109)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:167)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)

Reply via email to