Hello All! I am using spark 2.3.1 on kubernetes to run a structured streaming spark job which read stream from Kafka , perform some window aggregation and output sink to Kafka. After job running few hours(5-6 hours), the executor pods is getting crashed which is caused by "Too many open files in system". Digging in further, with "lsof" command, I can see there is a lot UNIX pipe getting opened.
# lsof -p 14 | tail java 14 root *112u a_inode 0,10 0 8838 [eventpoll] java 14 root *113r FIFO 0,9 0t0 252556158 pipe java 14 root *114w FIFO 0,9 0t0 252556158 pipe java 14 root *115u a_inode 0,10 0 8838 [eventpoll] java 14 root *119r FIFO 0,9 0t0 252552868 pipe java 14 root *120w FIFO 0,9 0t0 252552868 pipe java 14 root *121u a_inode 0,10 0 8838 [eventpoll] java 14 root *131r FIFO 0,9 0t0 252561014 pipe java 14 root *132w FIFO 0,9 0t0 252561014 pipe java 14 root *133u a_inode 0,10 0 8838 [eventpoll] Total count of open fd is going up to 85K (increased hard ulimit) for each pod and once it hit the hard limit , executor pod is getting crashed. For shuffling I can think of it need more fd but in my case open fd count keep growing forever. Not sure how can I estimate how many fd will be adequate or there is a bug. With that uncertainty, I increased hard ulimit to large number as 85k but no luck. Seems like there is file descriptor leak. This spark job is running with native support of kubernetes as spark cluster manager. Currently using only two executor with 20 core(request) and 10GB (+6GB as memoryOverhead) of physical memory each. Have any one else seen the similar problem ? Thanks for any suggestion. Error details: Caused by: java.io.FileNotFoundException: /tmp/blockmgr-b530983c-39be-4c6d-95aa-3ad12a507843/24/temp_shuffle_bf774cf5-fadb-4ca7-a27b-a5be7b835eb6 (Too many open files in system) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.<init>(FileOutputStream.java:213) at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103) at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) For more error log, please follow below Github gist: https://gist.github.com/abhisheyke/1ecd952f2ae6af20cf737308a156f566 Some details about file descriptor (lsof): https://gist.github.com/abhisheyke/27b073e7ac805ce9e6bb33c2b011bb5a Code Snip: https://gist.github.com/abhisheyke/6f838adf6651491bd4f263956f403c74 Platform Details: Kubernets Version : 1.9.2 Docker : 17.3.2 Spark version: 2.3.1 Kafka version: 2.11-0.10.2.1 (both topic has 20 partition and getting almost 5k records/s ) Hadoop version (Using hdfs for check pointing) : 2.7.2 Thank you for any help. Best Regards, *Abhishek Tripathi*