Use lsof to see what files are actually being held open. That stacktrace looks to me like it's from the driver, not executors. Where in foreach is it being called? The outermost portion of foreachRDD runs in the driver, the innermost portion runs in the executors. From the docs:
https://spark.apache.org/docs/latest/streaming-programming-guide.html dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker }} @td I've specifically looked at kafka socket connections for the standard 1.3 code vs my branch that has cached connections. The standard non-caching code has very short lived connections. I've had jobs running for a month at a time, including ones writing to mysql. Not saying it's impossible, but I'd think we need some evidence before speculating this has anything to do with it. On Wed, Apr 29, 2015 at 6:50 PM, Bill Jay <bill.jaypeter...@gmail.com> wrote: > This function is called in foreachRDD. I think it should be running in the > executors. I add the statement.close() in the code and it is running. I > will let you know if this fixes the issue. > > > > On Wed, Apr 29, 2015 at 4:06 PM, Tathagata Das <t...@databricks.com> > wrote: > >> Is the function ingestToMysql running on the driver or on the executors? >> Accordingly you can try debugging while running in a distributed manner, >> with and without calling the function. >> >> If you dont get "too many open files" without calling ingestToMysql(), >> the problem is likely to be in ingestToMysql(). >> If you get the problem even without calling ingestToMysql(), then the >> problem may be in Kafka. If the problem is occuring in the driver, then its >> the DirecKafkaInputDStream code. If the problem is occurring in the >> executor, then the problem is in KafkaRDD. >> >> TD >> >> On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Maybe add statement.close() in finally block ? >>> >>> Streaming / Kafka experts may have better insight. >>> >>> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay <bill.jaypeter...@gmail.com> >>> wrote: >>> >>>> Thanks for the suggestion. I ran the command and the limit is 1024. >>>> >>>> Based on my understanding, the connector to Kafka should not open so >>>> many files. Do you think there is possible socket leakage? BTW, in every >>>> batch which is 5 seconds, I output some results to mysql: >>>> >>>> def ingestToMysql(data: Array[String]) { >>>> val url = >>>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123" >>>> var sql = "insert into loggingserver1 values " >>>> data.foreach(line => sql += line) >>>> sql = sql.dropRight(1) >>>> sql += ";" >>>> logger.info(sql) >>>> var conn: java.sql.Connection = null >>>> try { >>>> conn = DriverManager.getConnection(url) >>>> val statement = conn.createStatement() >>>> statement.executeUpdate(sql) >>>> } catch { >>>> case e: Exception => logger.error(e.getMessage()) >>>> } finally { >>>> if (conn != null) { >>>> conn.close >>>> } >>>> } >>>> } >>>> >>>> I am not sure whether the leakage originates from Kafka connector or >>>> the sql connections. >>>> >>>> Bill >>>> >>>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> Can you run the command 'ulimit -n' to see the current limit ? >>>>> >>>>> To configure ulimit settings on Ubuntu, edit >>>>> */etc/security/limits.conf* >>>>> Cheers >>>>> >>>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay <bill.jaypeter...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I am using the direct approach to receive real-time data from Kafka >>>>>> in the following link: >>>>>> >>>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html >>>>>> >>>>>> >>>>>> My code follows the word count direct example: >>>>>> >>>>>> >>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala >>>>>> >>>>>> >>>>>> >>>>>> After around 12 hours, I got the following error messages in Spark >>>>>> log: >>>>>> >>>>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time >>>>>> 1430338690000 ms >>>>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too >>>>>> many open files, java.io.IOException: Too many open files, >>>>>> java.io.IOException: Too many open files, java.io.IOException: Too many >>>>>> open files, java.io.IOException: Too many open files) >>>>>> at >>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94) >>>>>> at >>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>>>>> at scala.Option.orElse(Option.scala:257) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>>>>> at scala.Option.orElse(Option.scala:257) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>>>>> at scala.Option.orElse(Option.scala:257) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>>>>> at scala.Option.orElse(Option.scala:257) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) >>>>>> at >>>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) >>>>>> at scala.Option.orElse(Option.scala:257) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) >>>>>> at >>>>>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) >>>>>> at >>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) >>>>>> at >>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) >>>>>> at >>>>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >>>>>> at >>>>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >>>>>> at >>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>> at >>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>> at >>>>>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >>>>>> at >>>>>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >>>>>> at >>>>>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) >>>>>> at >>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:239) >>>>>> at >>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:237) >>>>>> at scala.util.Try$.apply(Try.scala:161) >>>>>> at >>>>>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:237) >>>>>> at org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:174) >>>>>> at >>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:85) >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>>>> at >>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:83) >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>> at >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> >>>>>> Thanks for the help. >>>>>> >>>>>> Bill >>>>>> >>>>> >>>>> >>>> >>> >> >