Hi Gerard, From the below stack, it can only indicate the task is canceled that may be triggered by job manager becuase of other task failure. If the task can not be interrupted within timeout config, the task managerprocess will be exited. Do you see any OutOfMemory messages from the task manager log? Normally the ouput serialization buffer is managed by task manager framework and will not cause OOM, and on the input desearialization side, there will be a temp bytes array on each channel for holding partial records which is not managed by framework. I think you can confirm whether and where caused the OOM. Maybe check the task failure logs.
Zhijiang ------------------------------------------------------------------ 发件人:gerardg <ger...@talaia.io> 发送时间:2018年6月30日(星期六) 00:12 收件人:user <user@flink.apache.org> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory) (fixed formatting) Hello, We have experienced some problems where a task just hangs without showing any kind of log error while other tasks running in the same task manager continue without problems. When these tasks are restarted the task manager gets killed and shows several errors similar to these ones: [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) java.nio.ByteBuffer.wrap(ByteBuffer.java:396) org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 seconds, but is stuck in method: org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) scala.collection.immutable.List.foreach(List.scala:392) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) (...) org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) java.lang.Thread.run(Thread.java:748) Our task bundles several thousand of messages together so it creates some big single messages which could explain why the operator hangs trying to serialize the message. Our problem is that when a task hangs is very difficult to detect and we have to manually cancel and restart it. Is there any way to make the task manager fail or to increase the memory required by the allocation? Thanks, Gerard Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.