Hi all, We're running Kafka and have several Samza jobs consuming and producing messages, as well as a KConnect job consuming messages and storing them in a database table.
>From time to time, at random intervals (sometimes the cluster runs for days >without issues, sometimes only a few hours), our entire cluster would become >unresponsive because one broker goes down. On the broker itself no errors are >found, but we do notice that the number of open connections starts to increase >(in TIME_WAIT state) as well as the number of open file descriptors. This >goes on until we receive a 'java.io.IOException: Too many open files', which >is the first error/exception that occurs on the unresponsive broker. On the >other brokers, this stack trace can be found in the logs: Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@2881fb15 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 247 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) There are a few questions on my mind :): 1. Do you know why the broker itself is becoming unresponsive? Do you perhaps have a direction in mind for us to investigate further? 2. Do you know why the entire cluster becomes unresponsive, instead of dropping the unresponsive broker and electing new partition leaders? 3. Is there a way we can prevent the cluster becoming unresponsive when this occurs in the future? Thank you in advance! Met vriendelijke groet / Kind regards, Robert Žuljević Software Developer [cid:image003.png@01D28094.F9F2F4D0] ________________________________ Address: Trifkovicev trg 6, 21000 Novi Sad, Serbia Tel.: +31 20 6701 947 | +381 21 2155 500 Mobile: +381 64 42 8284 6 Skype: robert.zuljevic Internet: www.levi9.com<http://www.levi9.com/> Chamber of commerce Levi9 Holding: 34221951 Chamber of commerce Levi9 IT Services BV: 34224746 ________________________________ This e-mail may contain confidential or privileged information. If you are not (one of) the intended recipient(s), please notify the sender immediately by reply e-mail and delete this message and any attachments permanently without retaining a copy. Any review, disclosure, copying, distribution or taking any action in reliance on the contents of this e-mail by persons or entities other than the intended recipient(s) is strictly prohibited and may be unlawful. The services of Levi9 are exclusively subject to its general terms and conditions. These general terms and conditions can be found on www.levi9.com<http://www.levi9.com/> and a copy will be promptly submitted to you on your request and free of charge.