Thanks for providing this information, Rainie. Are other issues documented
in the logs besides the TimeoutException in the JM logs which you already
shared? For now, it looks like that there was a connection problem between
the TaskManager and the JobManager that caused the shutdown of the operator
resulting in the NetworkBufferPool to be destroyed. For this scenario I
would expect other failures to occur besides the ones you shared.

Best,
Matthias

On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <raini...@pinterest.com> wrote:

> Thank you Mattias.
> It’s version1.9.
>
> Best regards
> Rainie
>
> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> Hi Rainie,
>> the network buffer pool was destroyed for some reason. This happens when
>> the NettyShuffleEnvironment gets closed which is triggered when an operator
>> is cleaned up, for instance. Maybe, the timeout in the metric system caused
>> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this
>> conversation hoping that he can give more insights.
>>
>> If I may ask: What Flink version are you using?
>>
>> Thanks,
>> Matthias
>>
>>
>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <raini...@pinterest.com> wrote:
>>
>>> Hi All,
>>>
>>> Our flink application kept restarting and it did lots of RPC calls to a
>>> dependency service.
>>>
>>> *We saw this exception from failed task manager log: *
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>> Could not forward element to next operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179)
>>> at
>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)
>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203)
>>> at
>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189)
>>> at
>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> ... 23 more
>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
>>> at
>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
>>> at
>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
>>> at
>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>>> ... 32 more
>>>
>>> *We also saw this exception from Job manager log:*
>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting
>>>                    - Association to [akka.tcp://flink-metrics@host:38593]
>>> with UID [-1261564990] irrecoverably failed. Quarantining address.
>>> java.util.concurrent.TimeoutException: Remote system has been silent for
>>> too long. (more than 48.0 hours)
>>> at
>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> at
>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>> This app has been running fine for a month.
>>> Any suggestion what could cause the issue? Any suggestions on how to
>>> debug it?
>>> Appreciated all advice.
>>>
>>> Thanks
>>> Best regards
>>> Rainie
>>>
>>

Reply via email to