Hi all last weekend we have upgraded a small cluster (4 brokers) from 0.10.2.1 to 2.1.0. Everything seemed to go well on the weekend.
For 2 nights in a row we now had an strange behaviour on one of the 4 nodes. At almost exactly 00:08 in both nights, 1 out of the 4 brokers stopped writing anything to any of the logs for about 7 minutes. The other 3 brokers soon after started issuing exceptions "Connection to 5 was disconnected before the response was read" [1] for all the partitions that this broker was the leader. So far so good. What I would have expected was that this broker would have been "taken out of the cluster" and another broker to become the leader of the partitions in question. But after ~7 minutes, that broker started to log things again (all looked like normal processing) while the other 3 brokers kept on logging messages "Failed to connect within 30000 ms" [2]. This kept on going until we detected the situation in the morning. During this time, all listeners on partitions lead by this broker did not get any messages (but also not any errors). After the restart of the broker, thing normalized and a flurry of messages which was sent in the night were received by the listeners. So the message have actually been produced and stored on the erroneous broker, just not delivered and replicated. What we discovered before restarting the broker was that lsof showed thousands of connections like java 2977050 user 2579u sock 0,6 0t0 517152881 can't identify protocol The total number of files was very close to 4096 (but the ulimit of the process was actually 64k). Our best guess is that something in the JVM (not necessarily Kafka itsself, we're running some monitoring tools in the JVM) did some kind of housekeeping around midnight, failed to close some connections properly (hence the many files with "can't identify protocol" and brought the JVM into a state where it was unable to accept new connections. It seemed like it could hold the established connection to Zookeeper, the node was registered under /brokers/ids when we checked in the morning. Still we were very surprised that this broker kept on working to some extent and that no failover to another leader was initiated. Anybody have an idea what to look for? Maybe increase some log levels to get more insight into what's happening? -- CU, Joe [1] [2019-03-11 00:10:36,528] WARN [ReplicaFetcher replicaId=4, leaderId=5, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=4, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-24=(offset=149155836, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[813])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=519957053, epoch=1244473)) (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 5 was disconnected before the response was read at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) at kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2] [2019-03-11 02:59:51,144] WARN [ReplicaFetcher replicaId=4, leaderId=5, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=4, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-0=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[817]), __consumer_offsets-28=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[873]), PR4_CLONE_LIST_1-0=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[567]), __consumer_offsets-4=(offset=860199640, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[868]), __consumer_offsets-16=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[879]), __consumer_offsets-36=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[825]), __consumer_offsets-12=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[815]), __consumer_offsets-40=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[864]), __consumer_offsets-48=(offset=5, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[817]), __consumer_offsets-24=(offset=149155836, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[813])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=519957053, epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) java.net.SocketTimeoutException: Failed to connect within 30000 ms at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) at kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)