I guess it was happening because I canceled the old job and started it again. When I restarted my cluster it stoped to throw the error. But I still not sure which metric I can infer if backpressure is happening.
Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>* On Wed, Nov 6, 2019 at 4:30 PM Zhijiang <wangzhijiang...@aliyun.com> wrote: > Hi Felipe, > > "Buffer pool is destroyed" is mainly caused by canceling task. That means > there are other tasks failure which would trigger canceling all the > topology tasks by job master. > So if you want to find the root cause, it is proper to check the job > master log to find the first failure which triggers the following cancel > operations. > > In addition, which flink version are you using? > > Best, > Zhijiang > > ------------------------------------------------------------------ > From:Felipe Gutierrez <felipe.o.gutier...@gmail.com> > Send Time:2019 Nov. 6 (Wed.) 19:12 > To:user <user@flink.apache.org> > Subject:What metrics can I see the root cause of "Buffer pool is > destroyed" message? > > Hi community, > > Looking at the code [1] it seems that it is related to not have > availableMemorySegments anymore. I am looking at several metrics but it > hasn't seemed to help me understand where I can measure the root cause of > this error message. > > - flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments does not > seem to give me a related cause. > - flink_taskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLength > I see my reducer operator always with queue lenght equal 4. Pre-aggregate > task sometimes goes to 3 but it goes only few times. > - flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsage and > flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLength > shows my source task several times in 100%. But my error message comes from > the pre-aggregate task. > - > flink_taskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecond > DOES show the the pre-aggregate task is consuming a lot. But with which > metric can I relate this to know in advance how much is a lot? > > [1] > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L265 > > Thanks for your suggestions and here is my stack trace: > > java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) > at > org.apache.flink.streaming.examples.aggregate.WordCountPreAggregate$WordCountPreAggregateFunction.collect(WordCountPreAggregate.java:251) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamPreAggregateOperator.collect(AbstractUdfStreamPreAggregateOperator.java:84) > at > org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.collect(PreAggregateTriggerFunction.java:49) > at > org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.run(PreAggregateTriggerFunction.java:63) > at java.util.TimerThread.mainLoop(Timer.java:555) > at java.util.TimerThread.run(Timer.java:505) > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:264) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:240) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:215) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:182) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:131) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > >