Hi all

last weekend we have upgraded a small cluster (4 brokers) from 0.10.2.1
to 2.1.0. Everything seemed to go well on the weekend.

For 2 nights in a row we now had an strange behaviour on one of the 4
nodes. At almost exactly 00:08 in both nights, 1 out of the 4 brokers
stopped writing anything to any of the logs for about 7 minutes. The
other 3 brokers soon after started issuing exceptions "Connection to 5
was disconnected before the response was read" [1] for all the
partitions that this broker was the leader. So far so good. What I would
have expected was that this broker would have been "taken out of the
cluster" and another broker to become the leader of the partitions in
question.

But after ~7 minutes, that broker started to log things again (all
looked like normal processing) while the other 3 brokers kept on logging
messages "Failed to connect within 30000 ms" [2]. This kept on going
until we detected the situation in the morning. During this time, all
listeners on partitions lead by this broker did not get any messages
(but also not any errors). After the restart of the broker, thing
normalized and a flurry of messages which was sent in the night were
received by the listeners. So the message have actually been produced
and stored on the erroneous broker, just not delivered and replicated.

What we discovered before restarting the broker was that lsof showed
thousands of connections like

    java    2977050 user 2579u  sock                0,6        0t0
517152881 can't identify protocol

The total number of files was very close to 4096 (but the ulimit of the
process was actually 64k).

Our best guess is that something in the JVM (not necessarily Kafka
itsself, we're running some monitoring tools in the JVM) did some kind
of housekeeping around midnight, failed to close some connections
properly (hence the many files with "can't identify protocol" and
brought the JVM into a state where it was unable to accept new
connections. It seemed like it could hold the established connection to
Zookeeper, the node was registered under /brokers/ids when we checked in
the morning. Still we were very surprised that this broker kept on
working to some extent and that no failover to another leader was initiated.

Anybody have an idea what to look for? Maybe increase some log levels to
get more insight into what's happening?

-- 
CU, Joe

[1]

[2019-03-11 00:10:36,528] WARN [ReplicaFetcher replicaId=4, leaderId=5, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=4, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData={__consumer_offsets-24=(offset=149155836, logStartOffset=0, 
maxBytes=1048576, currentLeaderEpoch=Optional[813])}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=519957053, 
epoch=1244473)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 5 was disconnected before the response was 
read
        at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
        at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
        at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
        at 
kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
        at 
kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
        at 
kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:257)
        at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

[2]

[2019-03-11 02:59:51,144] WARN [ReplicaFetcher replicaId=4, leaderId=5, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=4, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData={__consumer_offsets-0=(offset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[817]), __consumer_offsets-28=(offset=0, 
logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[873]), 
PR4_CLONE_LIST_1-0=(offset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[567]), __consumer_offsets-4=(offset=860199640, 
logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[868]), 
__consumer_offsets-16=(offset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[879]), __consumer_offsets-36=(offset=0, 
logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[825]), 
__consumer_offsets-12=(offset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[815]), __consumer_offsets-40=(offset=0, 
logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[864]), 
__consumer_offsets-48=(offset=5, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[817]), __consumer_offsets-24=(offset=149155836, 
logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[813])}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=519957053, 
epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
        at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
        at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
        at 
kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
        at 
kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
        at 
kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:257)
        at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)



Reply via email to