Is the function ingestToMysql running on the driver or on the executors?
Accordingly you can try debugging while running in a distributed manner,
with and without calling the function.

If you dont get "too many open files" without calling ingestToMysql(), the
problem is likely to be in ingestToMysql().
If you get the problem even without calling ingestToMysql(), then the
problem may be in Kafka. If the problem is occuring in the driver, then its
the DirecKafkaInputDStream code. If the problem is occurring in the
executor, then the problem is in KafkaRDD.

TD

On Wed, Apr 29, 2015 at 2:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Maybe add statement.close() in finally block ?
>
> Streaming / Kafka experts may have better insight.
>
> On Wed, Apr 29, 2015 at 2:25 PM, Bill Jay <bill.jaypeter...@gmail.com>
> wrote:
>
>> Thanks for the suggestion. I ran the command and the limit is 1024.
>>
>> Based on my understanding, the connector to Kafka should not open so many
>> files. Do you think there is possible socket leakage? BTW, in every batch
>> which is 5 seconds, I output some results to mysql:
>>
>>   def ingestToMysql(data: Array[String]) {
>>     val url =
>> "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
>>     var sql = "insert into loggingserver1 values "
>>     data.foreach(line => sql += line)
>>     sql = sql.dropRight(1)
>>     sql += ";"
>>     logger.info(sql)
>>     var conn: java.sql.Connection = null
>>     try {
>>       conn = DriverManager.getConnection(url)
>>       val statement = conn.createStatement()
>>       statement.executeUpdate(sql)
>>     } catch {
>>       case e: Exception => logger.error(e.getMessage())
>>     } finally {
>>       if (conn != null) {
>>         conn.close
>>       }
>>     }
>>   }
>>
>> I am not sure whether the leakage originates from Kafka connector or the
>> sql connections.
>>
>> Bill
>>
>> On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Can you run the command 'ulimit -n' to see the current limit ?
>>>
>>> To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
>>> Cheers
>>>
>>> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay <bill.jaypeter...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am using the direct approach to receive real-time data from Kafka in
>>>> the following link:
>>>>
>>>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>>>
>>>>
>>>> My code follows the word count direct example:
>>>>
>>>>
>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>>>>
>>>>
>>>>
>>>> After around 12 hours, I got the following error messages in Spark log:
>>>>
>>>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
>>>> 1430338690000 ms
>>>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
>>>> many open files, java.io.IOException: Too many open files,
>>>> java.io.IOException: Too many open files, java.io.IOException: Too many
>>>> open files, java.io.IOException: Too many open files)
>>>>         at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>>>>         at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>>         at scala.Option.orElse(Option.scala:257)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>>>         at
>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>>         at scala.Option.orElse(Option.scala:257)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>>>         at
>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>>         at scala.Option.orElse(Option.scala:257)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>>>         at
>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>>         at scala.Option.orElse(Option.scala:257)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>>>         at
>>>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>>>>         at
>>>> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>>>>         at scala.Option.orElse(Option.scala:257)
>>>>         at
>>>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>>>>         at
>>>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>>>         at
>>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>>>         at
>>>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>>>         at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>         at
>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>         at
>>>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>>>         at
>>>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:239)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:237)
>>>>         at scala.util.Try$.apply(Try.scala:161)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:237)
>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:174)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:85)
>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:83)
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>         at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> Thanks for the help.
>>>>
>>>> Bill
>>>>
>>>
>>>
>>
>

Reply via email to