求助!
flink隔一段时间就会重启,重启时,ui界面中的 Bytes Received 达到600、700KB左右,40个任务 1个Failed 39 
个Canceled,然后就会重启,不知道到底是哪里出了问题,怎么解决呢?


Root Exception内容如下:


2021-04-1210:29:03
java.lang.Exception
    at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
    at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Causedby: java.net.SocketTimeoutException
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
    at 
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
    at kafka.utils.Utils$.read(Utils.scala:380)
    at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
    at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
    at 
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
    at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getLastOffset(LegacyFetcher.java:759)
    at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.getMissingOffsetsFromKafka(LegacyFetcher.java:712)
    at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:462)

回复