Thanks for the suggestion. I ran the command and the limit is 1024.
Based on my understanding, the connector to Kafka should not open so many
files. Do you think there is possible socket leakage? BTW, in every batch
which is 5 seconds, I output some results to mysql:
def ingestToMysql(data: Array[String]) {
val url = "jdbc:mysql://localhost:3306/realtime?user=root&password=123"
var sql = "insert into loggingserver1 values "
data.foreach(line => sql += line)
sql = sql.dropRight(1)
sql += ";"
logger.info(sql)
var conn: java.sql.Connection = null
try {
conn = DriverManager.getConnection(url)
val statement = conn.createStatement()
statement.executeUpdate(sql)
} catch {
case e: Exception => logger.error(e.getMessage())
} finally {
if (conn != null) {
conn.close
}
}
}
I am not sure whether the leakage originates from Kafka connector or the
sql connections.
Bill
On Wed, Apr 29, 2015 at 2:12 PM, Ted Yu <[email protected]> wrote:
> Can you run the command 'ulimit -n' to see the current limit ?
>
> To configure ulimit settings on Ubuntu, edit */etc/security/limits.conf*
> Cheers
>
> On Wed, Apr 29, 2015 at 2:07 PM, Bill Jay <[email protected]>
> wrote:
>
>> Hi all,
>>
>> I am using the direct approach to receive real-time data from Kafka in
>> the following link:
>>
>> https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html
>>
>>
>> My code follows the word count direct example:
>>
>>
>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
>>
>>
>>
>> After around 12 hours, I got the following error messages in Spark log:
>>
>> 15/04/29 20:18:10 ERROR JobScheduler: Error generating jobs for time
>> 1430338690000 ms
>> org.apache.spark.SparkException: ArrayBuffer(java.io.IOException: Too
>> many open files, java.io.IOException: Too many open files,
>> java.io.IOException: Too many open files, java.io.IOException: Too many
>> open files, java.io.IOException: Too many open files)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:239)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:237)
>> at scala.util.Try$.apply(Try.scala:161)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:237)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:174)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:85)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:83)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> Thanks for the help.
>>
>> Bill
>>
>
>