It's a producer pool, the borrow object takes an existing kafka producer
object if it is free, or creates one if all are being used.
Shouldn't we re-use kafka producer objects for writing to Kafka.

@ryan- can you suggest a good solution for writing a dstream to kafka which
can be used in production?

I am attaching the Kafka producer pool class, where would one issue a call
to close():

public class KafkaProducerPool implements Serializable {

   private static final long serialVersionUID = -1913028296093224674L;

   private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> pool;

   private ScheduledExecutorService executorService;

   private final Properties properties;

   private final int minIdle;

   /**
    * Creates the pool.
    *
    * @param minIdle
    *            minimum number of objects residing in the pool
    */
   public KafkaProducerPool(final int minIdle, final Properties properties) {
      // initialize pool
      this.properties = properties;
      this.minIdle = minIdle;
      initialize();

   }

   /**
    * Creates the pool.
    *
    * @param minIdle
    *            minimum number of objects residing in the pool
    * @param maxIdle
    *            maximum number of objects residing in the pool
    * @param validationInterval
    *            time in seconds for periodical checking of minIdle / maxIdle
    *            conditions in a separate thread. When the number of objects is
    *            less than minIdle, missing instances will be created. When the
    *            number of objects is greater than maxIdle, too many instances
    *            will be removed.
    */
   public KafkaProducerPool(final int minIdle, final int maxIdle,
         final long validationInterval, final Properties properties) {
      // initialize pool
      this.properties = properties;
      this.minIdle = minIdle;
      initialize();

      // check pool conditions in a separate thread
      executorService = Executors.newSingleThreadScheduledExecutor();
      executorService.scheduleWithFixedDelay(new Runnable() {
         @Override
         public void run() {
            int size = pool.size();
            if (size < minIdle) {
               int sizeToBeAdded = minIdle - size;
               for (int i = 0; i < sizeToBeAdded; i++) {
                  pool.add(createProducer());
               }
            } else if (size > maxIdle) {
               int sizeToBeRemoved = size - maxIdle;
               for (int i = 0; i < sizeToBeRemoved; i++) {
                  pool.poll();
               }
            }
         }
      }, validationInterval, validationInterval, TimeUnit.SECONDS);
   }

   /**
    * Gets the next free object from the pool. If the pool doesn't contain any
    * objects, a new object will be created and given to the caller of this
    * method back.
    *
    * @return T borrowed object
    */
   public synchronized KafkaProducer<String, String> borrowProducer() {
      if (pool == null)
         initialize();
      KafkaProducer<String, String> object;
      if ((object = pool.poll()) == null) {
         object = createProducer();
      }

      return object;
   }

   /**
    * Returns object back to the pool.
    *
    *            object to be returned
    */
   public void returnProducer(KafkaProducer<String, String> producer) {
      if (producer == null) {
         return;
      }
      this.pool.offer(producer);
   }

   /**
    * Shutdown this pool.
    */
   public void shutdown() {
      if (executorService != null) {
         KafkaProducer<String, String> producer;
         while ((producer = pool.poll()) != null) {
            producer.close();
         }
         executorService.shutdown();
      }
   }

   /**
    * Creates a new producer.
    *
    * @return T new object
    */
   private KafkaProducer<String, String> createProducer() {
      KafkaProducer<String, String> producer = new
KafkaProducer<String,String>(properties);
      return producer;
   }

   private void initialize() {
      pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>();

      for (int i = 0; i < minIdle; i++) {
         pool.add(createProducer());
      }
   }

   public void closeAll() {
      KafkaProducer<String, String> object;
      while ((object = pool.poll()) != null) {
         //object.flush();
         object.close();
      }
   }
}

Thanks
Nipun

On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> Looks like you create KafkaProducerPool in the driver. So when the task is
> running in the executor, it will always see an new empty KafkaProducerPool
> and create KafkaProducers. But nobody closes these KafkaProducers.
>
> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <nipunarora2...@gmail.com>
> wrote:
>
>
> Sorry for not writing the patch number, it's spark 1.6.1.
> The relevant code is here inline.
>
> Please have a look and let me know if there is a resource leak.
> Please also let me know if you need any more details.
>
> Thanks
> Nipun
>
>
> The JavaRDDKafkaWriter code is here inline:
>
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.VoidFunction;
> import scala.Tuple2;
>
> import java.io.Serializable;
> import java.util.Iterator;
>
> public class JavaRDDStringKafkaWriter implements Serializable, 
> VoidFunction<JavaRDD<String>> {
>
>    private static final long serialVersionUID = -865193912367180261L;
>    private final KafkaProducerPool pool;
>    private final String topic;
>    private final Boolean kafkaAsync;
>
>    public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String 
> topic, Boolean kafkaAsync) {
>       this.pool = pool;
>       this.topic = topic;
>       this.kafkaAsync = kafkaAsync;
>    }
>
>    @Override
>    public void call(JavaRDD<String> stringJavaRDD) throws Exception {
>       stringJavaRDD.foreachPartition(new PartitionVoidFunction(
>             new RDDKafkaWriter(pool,kafkaAsync), topic));
>    }
>
>    private class PartitionVoidFunction implements
>          VoidFunction<Iterator<String>> {
>
>       private static final long serialVersionUID = 8726871215617446598L;
>       private final RDDKafkaWriter kafkaWriter;
>       private final String topic;
>
>       public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
>          this.kafkaWriter = kafkaWriter;
>          this.topic = topic;
>       }
>
>       @Override
>       public void call(Iterator<String> iterator) throws Exception {
>          while (iterator.hasNext()) {
>             kafkaWriter.writeToKafka(topic, iterator.next());
>          }
>       }
>    }
> }
>
>
> The RDDKafkaWriter is here:
>
>
> import java.io.Serializable;
> import java.util.concurrent.ExecutionException;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import scala.Tuple2;
>
> public class RDDKafkaWriter implements Serializable {
>
>    private static final long serialVersionUID = 7374381310562055607L;
>    private final KafkaProducerPool pool;
>    private final Boolean async;
>
>    public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>       this.pool = pool;
>       this.async = async;
>
>    }
>
>    public void writeToKafka(String topic, Tuple2<String, String> message) {
>       KafkaProducer<String, String> producer = pool.borrowProducer();
>       ProducerRecord<String, String> record = new ProducerRecord<String, 
> String>(
>             topic, message._1(), message._2());
>       if (async) {
>          producer.send(record);
>       } else {
>          try {
>             producer.send(record).get();
>          } catch (Exception e) {
>             e.printStackTrace();
>          }
>       }
>       pool.returnProducer(producer);
>    }
>
>     public void writeToKafka(String topic, String message) {
>
>         KafkaProducer<String, String> producer = pool.borrowProducer();
>         ProducerRecord<String, String> record = new ProducerRecord<String, 
> String>(topic, message);
>
>         if (async) {
>             producer.send(record);
>         } else {
>             try {
>                 producer.send(record).get();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>         }
>         pool.returnProducer(producer);
>     }
>
>
> }
>
>
>
>
>
> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
> resources.
>
> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <nipunarora2...@gmail.com>
> wrote:
>
> It is spark 1.6
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> Could you provide your Spark version please?
>
> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <nipunarora2...@gmail.com>
> wrote:
>
> Hi,
>
> I get a resource leak, where the number of file descriptors in spark
> streaming keeps increasing. We end up with a "too many file open" error
> eventually through an exception caused in:
>
> JAVARDDKafkaWriter, which is writing a spark JavaDStream<String>
>
> The exception is attached inline. Any help will be greatly appreciated.
>
> Thanks
> Nipun
>
> -------------------------------------------
> Time: 1485762530000 ms
> -------------------------------------------
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
> java.io.FileNotFoundException:
> /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181)
> at
> org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
> at
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
> at
> org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46)
> at
> org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:25)
> at
> org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:10)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:228)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException:
> /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181)
> at
> org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
> at
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> ... 3 more
>
>
>
>
>

Reply via email to