Another question is: The timeout of 48 hours sounds strange. There should have been some other system noticing the connection problem earlier assuming that you have a reasonably low heartbeat interval configured.
Matthias On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <matth...@ververica.com> wrote: > Thanks for providing this information, Rainie. Are other issues documented > in the logs besides the TimeoutException in the JM logs which you already > shared? For now, it looks like that there was a connection problem between > the TaskManager and the JobManager that caused the shutdown of the operator > resulting in the NetworkBufferPool to be destroyed. For this scenario I > would expect other failures to occur besides the ones you shared. > > Best, > Matthias > > On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <raini...@pinterest.com> wrote: > >> Thank you Mattias. >> It’s version1.9. >> >> Best regards >> Rainie >> >> On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <matth...@ververica.com> >> wrote: >> >>> Hi Rainie, >>> the network buffer pool was destroyed for some reason. This happens when >>> the NettyShuffleEnvironment gets closed which is triggered when an operator >>> is cleaned up, for instance. Maybe, the timeout in the metric system caused >>> this. But I'm not sure how this is connected. I'm gonna add Chesnay to this >>> conversation hoping that he can give more insights. >>> >>> If I may ask: What Flink version are you using? >>> >>> Thanks, >>> Matthias >>> >>> >>> On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <raini...@pinterest.com> >>> wrote: >>> >>>> Hi All, >>>> >>>> Our flink application kept restarting and it did lots of RPC calls to a >>>> dependency service. >>>> >>>> *We saw this exception from failed task manager log: * >>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>> Could not forward element to next operator >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>>> at >>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>> at >>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179) >>>> at >>>> com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208) >>>> Caused by: java.lang.RuntimeException: Buffer pool is destroyed. >>>> at >>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) >>>> at >>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) >>>> at >>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >>>> at >>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>>> at >>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203) >>>> at >>>> com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189) >>>> at >>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >>>> ... 23 more >>>> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. >>>> at >>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239) >>>> at >>>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213) >>>> at >>>> org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181) >>>> at >>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256) >>>> at >>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249) >>>> at >>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169) >>>> at >>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154) >>>> at >>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120) >>>> at >>>> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) >>>> ... 32 more >>>> >>>> *We also saw this exception from Job manager log:* >>>> 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting >>>> - Association to [akka.tcp://flink-metrics@host:38593] >>>> with UID [-1261564990] irrecoverably failed. Quarantining address. >>>> java.util.concurrent.TimeoutException: Remote system has been silent >>>> for too long. (more than 48.0 hours) >>>> at >>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>> at >>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> This app has been running fine for a month. >>>> Any suggestion what could cause the issue? Any suggestions on how to >>>> debug it? >>>> Appreciated all advice. >>>> >>>> Thanks >>>> Best regards >>>> Rainie >>>> >>>