Hey Rainie, Kafka internally attempts to retry topic metadata fetches if possible. If you think the root cause was just due to network congestion or the like, you might want to look into increasing `request.timeout.ms`. Because of the internal retry attempts, however, this exception usually means that the client was unable to connect to the brokers in a more permanent way. The usual culprits are SSL topics with plaintext consumers or vice versa, SSL topics where the client was unable to fetch the necessary credentials, incorrect bootstrap servers configs, and cluster maintenance. Since your job was running successfully for a long time before crashing, I’d check the task manager logs for the period just before the error for any warnings or errors from Kafka to see if they shed light. If you’re dynamically loading SSL certs or using cnames to provide stable bootstrap server identifiers, it can be non-obvious if your config changed mid-operation, but often times some part of the stack will log a warning message or swallowed exception. If you have verbose logging, you might also be logging the Kafka connection properties each time they’re regenerated.
TL;DR: If you think the issue is due to high latency in communicating with your Kafka cluster, increase your configured timeouts. If you think the issue is due to (possibly temporary) dynamic config changes, check your error handling. Retrying the fetch from the Flink side will only help if the cause of the time is very transient. Julian From: Rainie Li <raini...@pinterest.com> Date: Thursday, March 4, 2021 at 1:49 PM To: "matth...@ververica.com" <matth...@ververica.com> Cc: user <user@flink.apache.org>, Chesnay Schepler <ches...@apache.org> Subject: Re: Flink application kept restarting Hi Matthias, Do you have any suggestions to handle timeout issues when fetching data from a Kafka topic? I am thinking of adding a retry logic into flink job, not sure if this is the right direction. Thanks again Best regards Rainie On Wed, Mar 3, 2021 at 12:24 AM Matthias Pohl <matth...@ververica.com<mailto:matth...@ververica.com>> wrote: Hi Rainie, in general buffer pools being destroyed usually mean that some other exception occurred that caused the task to fail and in the process of failure handling the operator-related network buffer is destroyed. That causes the "java.lang.RuntimeException: Buffer pool is destroyed." in your case. It looks like you had some timeout problem while fetching data from a Kafka topic. Matthias On Tue, Mar 2, 2021 at 10:39 AM Rainie Li <raini...@pinterest.com<mailto:raini...@pinterest.com>> wrote: Thanks for checking, Matthias. I have another flink job which failed last weekend with the same buffer pool destroyed error. This job is also running version 1.9. Here is the error I found from the task manager log. Any suggestion what is the root cause and how to fix it? 2021-02-28 00:54:45,943 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - Error while canceling task. java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at com.pinterest.xenon.unified.api191.SynchronousKafkaConsumer191$1.emitRecordWithTimestamp(SynchronousKafkaConsumer191.java:107) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) -- at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelRead(CreditBasedPartitionRequestClientHandler.java:175) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:107) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:170) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:366) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:307) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline.java:1452) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:329) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:315) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireUserEventTriggered(DefaultChannelPipeline.java:959) at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.lambda$notifyReaderNonEmpty$0(PartitionRequestQueue.java:86) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474) ... 2 more Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata Thanks again! Best regards Rainie On Mon, Mar 1, 2021 at 4:38 AM Matthias Pohl <matth...@ververica.com<mailto:matth...@ververica.com>> wrote: Another question is: The timeout of 48 hours sounds strange. There should have been some other system noticing the connection problem earlier assuming that you have a reasonably low heartbeat interval configured. Matthias On Mon, Mar 1, 2021 at 1:22 PM Matthias Pohl <matth...@ververica.com<mailto:matth...@ververica.com>> wrote: Thanks for providing this information, Rainie. Are other issues documented in the logs besides the TimeoutException in the JM logs which you already shared? For now, it looks like that there was a connection problem between the TaskManager and the JobManager that caused the shutdown of the operator resulting in the NetworkBufferPool to be destroyed. For this scenario I would expect other failures to occur besides the ones you shared. Best, Matthias On Fri, Feb 26, 2021 at 8:28 PM Rainie Li <raini...@pinterest.com<mailto:raini...@pinterest.com>> wrote: Thank you Mattias. It’s version1.9. Best regards Rainie On Fri, Feb 26, 2021 at 6:33 AM Matthias Pohl <matth...@ververica.com<mailto:matth...@ververica.com>> wrote: Hi Rainie, the network buffer pool was destroyed for some reason. This happens when the NettyShuffleEnvironment gets closed which is triggered when an operator is cleaned up, for instance. Maybe, the timeout in the metric system caused this. But I'm not sure how this is connected. I'm gonna add Chesnay to this conversation hoping that he can give more insights. If I may ask: What Flink version are you using? Thanks, Matthias On Fri, Feb 26, 2021 at 8:39 AM Rainie Li <raini...@pinterest.com<mailto:raini...@pinterest.com>> wrote: Hi All, Our flink application kept restarting and it did lots of RPC calls to a dependency service. We saw this exception from failed task manager log: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:179) at com.pinterest.analytics.streaming.logics.ExtractPartnerEventsLogic$$anon$10.flatMap(ExtractPartnerEventsLogic.scala:173) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPeriodicWatermark(AbstractFetcher.java:436) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:402) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208) Caused by: java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:203) at com.pinterest.analytics.streaming.logics.PISLogic$BatchEventsFunction.processElement(PISLogic.scala:189) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) ... 23 more Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213) at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ... 32 more We also saw this exception from Job manager log: 2021-02-25 21:32:42,874 ERROR akka.remote.Remoting - Association to [akka.tcp://flink-metrics@host:38593] with UID [-1261564990] irrecoverably failed. Quarantining address. java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours) at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) This app has been running fine for a month. Any suggestion what could cause the issue? Any suggestions on how to debug it? Appreciated all advice. Thanks Best regards Rainie