Thanks Zhijiang, Yes, I guess our best option right now is to just reduce the structure of the output record and see if that solves the problem.
Gerard On Tue, Jul 17, 2018 at 4:56 PM Zhijiang(wangzhijiang999) < wangzhijiang...@aliyun.com> wrote: > Hi Gerard, > > From the jstack you provided, the task is serializing the output record > and during this process it will not process the input data any more. > It can not indicate out of memory issue from this stack. And if the output > buffer is exhausted, the task will be blocked on requestBufferBlocking > process. > > I think the key point is your output record is too large and complicated > structure, because every field and collection in this complicated class > will be traversed to serialize, then it will cost much time and CPU usage. > Furthermore, the checkpoint can not be done because of waiting for lock > which is also occupied by task output process. > > As you mentioned, it makes sense to check the data structure of the output > record and reduces the size or make it lightweight to handle. > > Best, > > Zhijiang > > ------------------------------------------------------------------ > 发件人:Gerard Garcia <ger...@talaia.io> > 发送时间:2018年7月17日(星期二) 21:53 > 收件人:piotr <pi...@data-artisans.com> > 抄 送:fhueske <fhue...@gmail.com>; wangzhijiang999 < > wangzhijiang...@aliyun.com>; user <user@flink.apache.org>; nico < > n...@data-artisans.com> > 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory) > > Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record > (probably too big, we have already started working to reduce its size) > which consists of several case classes which have (among others) fields of > type String. > > I attach a CPU profile of the thread stuck serializing. I also attach the > memory and GC telemetry that the profiler shows (which maybe is more > informative than the one recorded from the JVM metrics). Only one node was > actually "doing something" all others had CPU usage near zero. > > The task is at the same time trying to perform a checkpoint but keeps > failing. Would it make sense that the problem is that there is not enough > memory available to perform the checkpoint so all operators are stuck > waiting for it to finish, and at the same time, the operator stuck > serializing is keeping all the memory so neither it nor the checkpoint can > advance? > > I realized that I don't have a minimum pause between checkpoints so it is > continuously trying. Maybe I can reduce the checkpoint timeout from the 10m > default and introduce a minimum pause (e.g. 5m timeout and 5m minimum > pause) and this way I could break the deadlock. > > Gerard > > > On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski <pi...@data-artisans.com> > wrote: > Hi, > > Thanks for the additional data. Just to make sure, are you using Flink > 1.5.0? > > There are a couple of threads that seams to be looping in serialisation, > while others are blocked and either waiting for new data or waiting for > some one to consume some data. Could you debug or CPU profile the code, in > particularly focusing on threads with stack trace as below [1]. Aren’t you > trying to serialise some gigantic String? > > Piotrek > > [1]: > > "(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x00007f52584d2800 nid=0x6819 > runnable [0x00007f451a843000] > java.lang.Thread.State: RUNNABLE > at > org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133) > at org.apache.flink.types.StringValue.writeString(StringValue.java:812) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at (...) > at (...) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > - locked <0x00007f4b5488f2b8> (a java.lang.Object) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > > On 16 Jul 2018, at 17:03, Gerard Garcia <ger...@talaia.io> wrote: > > Hi Piotr, > > I attach the GC pauses logged a while back when the task stopped > processing during several hours (it stopped at about 20:05) and a jstack > dump from the last time the task hanged. > > Thanks, > > Gerard > > On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski <pi...@data-artisans.com> > wrote: > Hi Gerard, > > I second to what Zhijiang wrote. Please check GC pauses, either via GC > logging, 3rd party tool like jconsole (or some memory profiler) or via > enabling resource logging in Flink. > > After confirming that this is not the issue next time this happens, > instead of cancelling the job, please collect thread dumps on a process > that is stuck. > > Piotrek > > On 16 Jul 2018, at 13:53, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Gerard, > > Thanks for reporting this issue. I'm pulling in Nico and Piotr who have > been working on the networking stack lately and might have some ideas > regarding your issue. > > Best, Fabian > > 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) < > wangzhijiang...@aliyun.com>: > Hi Gerard, > > I thought the failed task triggers cancel process before, now I am clear > that you cancel the task when it stops processing data. > I think you can jstack the process to find where task thread is blocked > instead of canceling it, then we may find some hints. > > In addition, the following stack "DataOutputSerializer.resize" indicates > the task is serializing the record and there will be overhead byte buffers > in the serializer for copying data temporarily. And if your record is too > large, it may cause OOM in this process and this overhead memory is not > managed by flink framework. Also you can monitor the gc status to check the > full gc delay. > > Best, > Zhijiang > ------------------------------------------------------------------ > 发件人:Gerard Garcia <ger...@talaia.io> > 发送时间:2018年7月13日(星期五) 16:22 > 收件人:wangzhijiang999 <wangzhijiang...@aliyun.com> > 抄 送:user <user@flink.apache.org> > 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory) > > Hi Zhijiang, > > The problem is that no other task failed first. We have a task that > sometimes just stops processing data, and when we cancel it, we see the > logs messages saying: > > " Task (...) did not react to cancelling signal for 30 seconds, but is > stuck in method: > org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) > org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133) > org.apache.flink.types.StringValue.writeString(StringValue.java:802) > (...)" > > That is why we suspect that it hangs forever at that point and that is why > it stops processing data. I don;t see any increase in memory use in the > heap (I guess because these buffers are managed by Flink) so I'm not sure > if that is really the problem. > > Gerard > > On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) < > wangzhijiang...@aliyun.com> wrote: > Hi Gerard, > > I think you can check the job manager log to find which task failed at > first, and then trace the task manager log containing the failed task to > find the initial reason. > The failed task will trigger canceling all the other tasks, and during > canceling process, the blocked task that is waiting for output buffer can > not be interrupted by the > canceler thread which is shown in your description. So I think the cancel > process is not the key point and is in expectation. Maybe it did not cause > OOM at all. > If the taskduring canceling, the task manager process will be exited > finally to trigger restarting the job. > > Zhijiang > ------------------------------------------------------------------ > 发件人:Gerard Garcia <ger...@talaia.io> > 发送时间:2018年7月2日(星期一) 18:29 > 收件人:wangzhijiang999 <wangzhijiang...@aliyun.com> > 抄 送:user <user@flink.apache.org> > 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory) > > Thanks Zhijiang, > > We haven't found any other relevant log messages anywhere. These traces > belong to the unresponsive task, that is why we suspect that at some point > it did not have enough memory to serialize the message and it blocked. I've > also found that when it hanged several output buffers were full (see > attached image buffers.outPoolUsage.png) so I guess the traces just reflect > that. > > Probably the task hanged for some other reason and that is what filled the > output buffers previous to the blocked operator. I'll have to continue > investigating to find the real cause. > > Gerard > > > > > On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) < > wangzhijiang...@aliyun.com> wrote: > Hi Gerard, > > From the below stack, it can only indicate the task is canceled that > may be triggered by job manager becuase of other task failure. If the task > can not be interrupted within timeout config, the task managerprocess will > be exited. Do you see any OutOfMemory messages from the task manager log? > Normally the ouput serialization buffer is managed by task manager > framework and will not cause OOM, and on the input desearialization side, > there will be a temp bytes array on each channel for holding partial > records which is not managed by framework. I think you can confirm whether > and where caused the OOM. Maybe check the task failure logs. > > Zhijiang > > ------------------------------------------------------------------ > 发件人:gerardg <ger...@talaia.io> > 发送时间:2018年6月30日(星期六) 00:12 > 收件人:user <user@flink.apache.org> > 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory) > > (fixed formatting) > > Hello, > > We have experienced some problems where a task just hangs without showing > any kind of log error while other tasks running in the same task manager > continue without problems. When these tasks are restarted the task manager > gets killed and shows several errors similar to these ones: > > [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 > seconds, but is stuck in method: > java.nio.ByteBuffer.wrap(ByteBuffer.java:373) > java.nio.ByteBuffer.wrap(ByteBuffer.java:396) > org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330) > org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) > scala.collection.immutable.List.foreach(List.scala:392) > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > (...) > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > (...) > org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > java.lang.Thread.run(Thread.java:748) > > [Canceler/Interrupts for (...)' did not react to cancelling signal for 30 > seconds, but is stuck in method: > org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305) > org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212) > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63) > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98) > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93) > scala.collection.immutable.List.foreach(List.scala:392) > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93) > org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113) > org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32) > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177) > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49) > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88) > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > (...) > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469) > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446) > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > (...) > org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63) > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50) > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403) > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > java.lang.Thread.run(Thread.java:748) > > Our task bundles several thousand of messages together so it creates some > big single messages which could explain why the operator hangs trying to > serialize the message. Our problem is that when a task hangs is very > difficult to detect and we have to manually cancel and restart it. > > Is there any way to make the task manager fail or to increase the memory > required by the allocation? > > Thanks, Gerard > ------------------------------ > Sent from the Apache Flink User Mailing List archive. mailing list archive > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at > Nabble.com <http://nabble.com/>. > > > > > > <Heap size.png><G1 Young|Old generation time.png><hang_jstack> > > >