Looks like you create KafkaProducerPool in the driver. So when the task is
running in the executor, it will always see an new empty KafkaProducerPool
and create KafkaProducers. But nobody closes these KafkaProducers.

On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <nipunarora2...@gmail.com>
wrote:

>
> Sorry for not writing the patch number, it's spark 1.6.1.
> The relevant code is here inline.
>
> Please have a look and let me know if there is a resource leak.
> Please also let me know if you need any more details.
>
> Thanks
> Nipun
>
>
> The JavaRDDKafkaWriter code is here inline:
>
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.VoidFunction;
> import scala.Tuple2;
>
> import java.io.Serializable;
> import java.util.Iterator;
>
> public class JavaRDDStringKafkaWriter implements Serializable, 
> VoidFunction<JavaRDD<String>> {
>
>    private static final long serialVersionUID = -865193912367180261L;
>    private final KafkaProducerPool pool;
>    private final String topic;
>    private final Boolean kafkaAsync;
>
>    public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String 
> topic, Boolean kafkaAsync) {
>       this.pool = pool;
>       this.topic = topic;
>       this.kafkaAsync = kafkaAsync;
>    }
>
>    @Override
>    public void call(JavaRDD<String> stringJavaRDD) throws Exception {
>       stringJavaRDD.foreachPartition(new PartitionVoidFunction(
>             new RDDKafkaWriter(pool,kafkaAsync), topic));
>    }
>
>    private class PartitionVoidFunction implements
>          VoidFunction<Iterator<String>> {
>
>       private static final long serialVersionUID = 8726871215617446598L;
>       private final RDDKafkaWriter kafkaWriter;
>       private final String topic;
>
>       public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
>          this.kafkaWriter = kafkaWriter;
>          this.topic = topic;
>       }
>
>       @Override
>       public void call(Iterator<String> iterator) throws Exception {
>          while (iterator.hasNext()) {
>             kafkaWriter.writeToKafka(topic, iterator.next());
>          }
>       }
>    }
> }
>
>
> The RDDKafkaWriter is here:
>
>
> import java.io.Serializable;
> import java.util.concurrent.ExecutionException;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import scala.Tuple2;
>
> public class RDDKafkaWriter implements Serializable {
>
>    private static final long serialVersionUID = 7374381310562055607L;
>    private final KafkaProducerPool pool;
>    private final Boolean async;
>
>    public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>       this.pool = pool;
>       this.async = async;
>
>    }
>
>    public void writeToKafka(String topic, Tuple2<String, String> message) {
>       KafkaProducer<String, String> producer = pool.borrowProducer();
>       ProducerRecord<String, String> record = new ProducerRecord<String, 
> String>(
>             topic, message._1(), message._2());
>       if (async) {
>          producer.send(record);
>       } else {
>          try {
>             producer.send(record).get();
>          } catch (Exception e) {
>             e.printStackTrace();
>          }
>       }
>       pool.returnProducer(producer);
>    }
>
>     public void writeToKafka(String topic, String message) {
>
>         KafkaProducer<String, String> producer = pool.borrowProducer();
>         ProducerRecord<String, String> record = new ProducerRecord<String, 
> String>(topic, message);
>
>         if (async) {
>             producer.send(record);
>         } else {
>             try {
>                 producer.send(record).get();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>         }
>         pool.returnProducer(producer);
>     }
>
>
> }
>
>
>
>
>
> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
>> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
>> resources.
>>
>> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <nipunarora2...@gmail.com>
>> wrote:
>>
>> It is spark 1.6
>>
>> Thanks
>> Nipun
>>
>> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>> Could you provide your Spark version please?
>>
>> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <nipunarora2...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I get a resource leak, where the number of file descriptors in spark
>> streaming keeps increasing. We end up with a "too many file open" error
>> eventually through an exception caused in:
>>
>> JAVARDDKafkaWriter, which is writing a spark JavaDStream<String>
>>
>> The exception is attached inline. Any help will be greatly appreciated.
>>
>> Thanks
>> Nipun
>>
>> -------------------------------------------
>> Time: 1485762530000 ms
>> -------------------------------------------
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
>> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$failJobAndIndependentStages(
>> DAGScheduler.scala:1431)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1419)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1418)
>> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(
>> DAGScheduler.scala:1418)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at scala.Option.foreach(Option.scala:236)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> doOnReceive(DAGScheduler.scala:1640)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1599)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:920)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:918)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:150)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:111)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
>> at org.apache.spark.api.java.JavaRDDLike$class.
>> foreachPartition(JavaRDDLike.scala:225)
>> at org.apache.spark.api.java.AbstractJavaRDDLike.
>> foreachPartition(JavaRDDLike.scala:46)
>> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
>> JavaRDDStringKafkaWriter.java:25)
>> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
>> JavaRDDStringKafkaWriter.java:10)
>> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
>> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
>> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>> at org.apache.spark.streaming.dstream.DStream$$anonfun$
>> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>> at org.apache.spark.streaming.dstream.DStream$$anonfun$
>> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.DStream.
>> createRDDWithLocalProperties(DStream.scala:426)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
>> ForEachDStream.scala:49)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
>> ForEachDStream.scala:49)
>> at scala.util.Try$.apply(Try.scala:161)
>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler.run(JobScheduler.scala:228)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> ... 3 more
>>
>>
>>
>>

Reply via email to