It's a producer pool, the borrow object takes an existing kafka producer object if it is free, or creates one if all are being used. Shouldn't we re-use kafka producer objects for writing to Kafka.
@ryan- can you suggest a good solution for writing a dstream to kafka which can be used in production? I am attaching the Kafka producer pool class, where would one issue a call to close(): public class KafkaProducerPool implements Serializable { private static final long serialVersionUID = -1913028296093224674L; private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> pool; private ScheduledExecutorService executorService; private final Properties properties; private final int minIdle; /** * Creates the pool. * * @param minIdle * minimum number of objects residing in the pool */ public KafkaProducerPool(final int minIdle, final Properties properties) { // initialize pool this.properties = properties; this.minIdle = minIdle; initialize(); } /** * Creates the pool. * * @param minIdle * minimum number of objects residing in the pool * @param maxIdle * maximum number of objects residing in the pool * @param validationInterval * time in seconds for periodical checking of minIdle / maxIdle * conditions in a separate thread. When the number of objects is * less than minIdle, missing instances will be created. When the * number of objects is greater than maxIdle, too many instances * will be removed. */ public KafkaProducerPool(final int minIdle, final int maxIdle, final long validationInterval, final Properties properties) { // initialize pool this.properties = properties; this.minIdle = minIdle; initialize(); // check pool conditions in a separate thread executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { int size = pool.size(); if (size < minIdle) { int sizeToBeAdded = minIdle - size; for (int i = 0; i < sizeToBeAdded; i++) { pool.add(createProducer()); } } else if (size > maxIdle) { int sizeToBeRemoved = size - maxIdle; for (int i = 0; i < sizeToBeRemoved; i++) { pool.poll(); } } } }, validationInterval, validationInterval, TimeUnit.SECONDS); } /** * Gets the next free object from the pool. If the pool doesn't contain any * objects, a new object will be created and given to the caller of this * method back. * * @return T borrowed object */ public synchronized KafkaProducer<String, String> borrowProducer() { if (pool == null) initialize(); KafkaProducer<String, String> object; if ((object = pool.poll()) == null) { object = createProducer(); } return object; } /** * Returns object back to the pool. * * object to be returned */ public void returnProducer(KafkaProducer<String, String> producer) { if (producer == null) { return; } this.pool.offer(producer); } /** * Shutdown this pool. */ public void shutdown() { if (executorService != null) { KafkaProducer<String, String> producer; while ((producer = pool.poll()) != null) { producer.close(); } executorService.shutdown(); } } /** * Creates a new producer. * * @return T new object */ private KafkaProducer<String, String> createProducer() { KafkaProducer<String, String> producer = new KafkaProducer<String,String>(properties); return producer; } private void initialize() { pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>(); for (int i = 0; i < minIdle; i++) { pool.add(createProducer()); } } public void closeAll() { KafkaProducer<String, String> object; while ((object = pool.poll()) != null) { //object.flush(); object.close(); } } } Thanks Nipun On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote: > Looks like you create KafkaProducerPool in the driver. So when the task is > running in the executor, it will always see an new empty KafkaProducerPool > and create KafkaProducers. But nobody closes these KafkaProducers. > > On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <nipunarora2...@gmail.com> > wrote: > > > Sorry for not writing the patch number, it's spark 1.6.1. > The relevant code is here inline. > > Please have a look and let me know if there is a resource leak. > Please also let me know if you need any more details. > > Thanks > Nipun > > > The JavaRDDKafkaWriter code is here inline: > > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.function.VoidFunction; > import scala.Tuple2; > > import java.io.Serializable; > import java.util.Iterator; > > public class JavaRDDStringKafkaWriter implements Serializable, > VoidFunction<JavaRDD<String>> { > > private static final long serialVersionUID = -865193912367180261L; > private final KafkaProducerPool pool; > private final String topic; > private final Boolean kafkaAsync; > > public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String > topic, Boolean kafkaAsync) { > this.pool = pool; > this.topic = topic; > this.kafkaAsync = kafkaAsync; > } > > @Override > public void call(JavaRDD<String> stringJavaRDD) throws Exception { > stringJavaRDD.foreachPartition(new PartitionVoidFunction( > new RDDKafkaWriter(pool,kafkaAsync), topic)); > } > > private class PartitionVoidFunction implements > VoidFunction<Iterator<String>> { > > private static final long serialVersionUID = 8726871215617446598L; > private final RDDKafkaWriter kafkaWriter; > private final String topic; > > public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) { > this.kafkaWriter = kafkaWriter; > this.topic = topic; > } > > @Override > public void call(Iterator<String> iterator) throws Exception { > while (iterator.hasNext()) { > kafkaWriter.writeToKafka(topic, iterator.next()); > } > } > } > } > > > The RDDKafkaWriter is here: > > > import java.io.Serializable; > import java.util.concurrent.ExecutionException; > > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerRecord; > > import scala.Tuple2; > > public class RDDKafkaWriter implements Serializable { > > private static final long serialVersionUID = 7374381310562055607L; > private final KafkaProducerPool pool; > private final Boolean async; > > public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) { > this.pool = pool; > this.async = async; > > } > > public void writeToKafka(String topic, Tuple2<String, String> message) { > KafkaProducer<String, String> producer = pool.borrowProducer(); > ProducerRecord<String, String> record = new ProducerRecord<String, > String>( > topic, message._1(), message._2()); > if (async) { > producer.send(record); > } else { > try { > producer.send(record).get(); > } catch (Exception e) { > e.printStackTrace(); > } > } > pool.returnProducer(producer); > } > > public void writeToKafka(String topic, String message) { > > KafkaProducer<String, String> producer = pool.borrowProducer(); > ProducerRecord<String, String> record = new ProducerRecord<String, > String>(topic, message); > > if (async) { > producer.send(record); > } else { > try { > producer.send(record).get(); > } catch (Exception e) { > e.printStackTrace(); > } > } > pool.returnProducer(producer); > } > > > } > > > > > > On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > > Please also include the patch version, such as 1.6.0, 1.6.1. Could you > also post the JAVARDDKafkaWriter codes. It's also possible that it leaks > resources. > > On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <nipunarora2...@gmail.com> > wrote: > > It is spark 1.6 > > Thanks > Nipun > > On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > > Could you provide your Spark version please? > > On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <nipunarora2...@gmail.com> > wrote: > > Hi, > > I get a resource leak, where the number of file descriptors in spark > streaming keeps increasing. We end up with a "too many file open" error > eventually through an exception caused in: > > JAVARDDKafkaWriter, which is writing a spark JavaDStream<String> > > The exception is attached inline. Any help will be greatly appreciated. > > Thanks > Nipun > > ------------------------------------------- > Time: 1485762530000 ms > ------------------------------------------- > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent > failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost): > java.io.FileNotFoundException: > /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084 > (too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.<init>(FileOutputStream.java:221) > at > org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181) > at > org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) > at > org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225) > at > org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46) > at > org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:25) > at > org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:10) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:228) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084 > (too many open files) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.<init>(FileOutputStream.java:221) > at > org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181) > at > org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > ... 3 more > > > > >