I guess it was happening because I canceled the old job and started it
again. When I restarted my cluster it stoped to throw the error.
But I still not sure which metric I can infer if backpressure is happening.

Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Wed, Nov 6, 2019 at 4:30 PM Zhijiang <wangzhijiang...@aliyun.com> wrote:

> Hi Felipe,
>
> "Buffer pool is destroyed" is mainly caused by canceling task. That means
> there are other tasks failure which would trigger canceling all the
> topology tasks by job master.
> So if you want to find the root cause, it is proper to check the job
> master log to find the first failure which triggers the following cancel
> operations.
>
> In addition, which flink version are you using?
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> From:Felipe Gutierrez <felipe.o.gutier...@gmail.com>
> Send Time:2019 Nov. 6 (Wed.) 19:12
> To:user <user@flink.apache.org>
> Subject:What metrics can I see the root cause of "Buffer pool is
> destroyed" message?
>
> Hi community,
>
> Looking at the code [1] it seems that it is related to not have
> availableMemorySegments anymore. I am looking at several metrics but it
> hasn't seemed to help me understand where I can measure the root cause of
> this error message.
>
> - flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments does not
> seem to give me a related cause.
> - flink_taskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLength
> I see my reducer operator always with queue lenght equal 4. Pre-aggregate
> task sometimes goes to 3 but it goes only few times.
> - flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsage and
> flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLength
> shows my source task several times in 100%. But my error message comes from
> the pre-aggregate task.
> -
> flink_taskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecond
> DOES show the the pre-aggregate task is consuming a lot. But with which
> metric can I relate this to know in advance how much is a lot?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L265
>
> Thanks for your suggestions and here is my stack trace:
>
> java.lang.RuntimeException: Buffer pool is destroyed.
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.streaming.examples.aggregate.WordCountPreAggregate$WordCountPreAggregateFunction.collect(WordCountPreAggregate.java:251)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamPreAggregateOperator.collect(AbstractUdfStreamPreAggregateOperator.java:84)
> at
> org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.collect(PreAggregateTriggerFunction.java:49)
> at
> org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.run(PreAggregateTriggerFunction.java:63)
> at java.util.TimerThread.mainLoop(Timer.java:555)
> at java.util.TimerThread.run(Timer.java:505)
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:264)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:240)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:215)
> at
> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:182)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:131)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
>

Reply via email to