This is more of a Java question. You don't 'clean up' threads but rather rearchitect your app so that you don't create long running threads that don't terminate. Consider also an Executor instead of manually creating threads.
On Mon, Oct 31, 2016 at 7:20 PM kant kodali <kanth...@gmail.com> wrote: > Hi Ryan, > > Ahh My Receiver.onStop method is currently empty. > > 1) I have a hard time seeing why the receiver would crash so many times > within a span of 4 to 5 hours but anyways I understand I should still cleanup > during OnStop. > > 2) How do I clean up those threads? The documentation here > https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html doesn't seem > to have any method where I can clean up the threads created during OnStart. > any ideas? > > Thanks! > > > On Mon, Oct 31, 2016 at 11:58 AM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > > So in your code, each Receiver will start a new thread. Did you stop the > receiver properly in `Receiver.onStop`? Otherwise, you may leak threads > after a receiver crashes and is restarted by Spark. However, this may be > the root cause since the leak threads are in the driver side. Could you use > `jstack` to check which types of threads are leaking? > > On Mon, Oct 31, 2016 at 11:50 AM, kant kodali <kanth...@gmail.com> wrote: > > I am also under the assumption that *onStart *function of the Receiver is > only called only once by Spark. please correct me if I am wrong. > > On Mon, Oct 31, 2016 at 11:35 AM, kant kodali <kanth...@gmail.com> wrote: > > My driver program runs a spark streaming job. And it spawns a thread by > itself only in the *onStart()* function below Other than that it doesn't > spawn any other threads. It only calls MapToPair, ReduceByKey, forEachRDD, > Collect functions. > > public class NSQReceiver extends Receiver<String> { > > private String topic=""; > > public NSQReceiver(String topic) { > super(StorageLevel.MEMORY_AND_DISK_2()); > this.topic = topic; > } > > @Override > public void *onStart()* { > new Thread() { > @Override public void run() { > receive(); > } > }.start(); > } > > } > > > Environment info: > > Java 8 > > Scala 2.11.8 > > Spark 2.0.0 > > More than happy to share any other info you may need. > > > On Mon, Oct 31, 2016 at 11:05 AM, Jakob Odersky <ja...@odersky.com> wrote: > > > how do I tell my spark driver program to not create so many? > > This may depend on your driver program. Do you spawn any threads in > it? Could you share some more information on the driver program, spark > version and your environment? It would greatly help others to help you > > On Mon, Oct 31, 2016 at 3:47 AM, kant kodali <kanth...@gmail.com> wrote: > > The source of my problem is actually that I am running into the following > > error. This error seems to happen after running my driver program for 4 > > hours. > > > > "Exception in thread "ForkJoinPool-50-worker-11" Exception in thread > > "dag-scheduler-event-loop" Exception in thread > "ForkJoinPool-50-worker-13" > > java.lang.OutOfMemoryError: unable to create new native thread" > > > > and this wonderful book taught me that the error "unable to create new > > native thread" can happen because JVM is trying to request the OS for a > > thread and it is refusing to do so for the following reasons > > > > 1. The system has actually run out of virtual memory. > > 2. On Unix-style systems, the user has already created (between all > programs > > user is running) the maximum number of processes configured for that user > > login. Individual threads are considered a process in that regard. > > > > Option #2 is ruled out in my case because my driver programing is running > > with a userid of root which has maximum number of processes set to > 120242 > > > > ulimit -a gives me the following > > > > core file size (blocks, -c) 0 > > data seg size (kbytes, -d) unlimited > > scheduling priority (-e) 0 > > file size (blocks, -f) unlimited > > pending signals (-i) 120242 > > max locked memory (kbytes, -l) 64 > > max memory size (kbytes, -m) unlimited > > open files (-n) 1024 > > pipe size (512 bytes, -p) 8 > > POSIX message queues (bytes, -q) 819200 > > real-time priority (-r) 0 > > stack size (kbytes, -s) 8192 > > cpu time (seconds, -t) unlimited > > max user processes (-u) 120242 > > virtual memory (kbytes, -v) unlimited > > file locks (-x) unlimited > > > > So at this point I do understand that the I am running out of memory due > to > > allocation of threads so my biggest question is how do I tell my spark > > driver program to not create so many? > > > > On Mon, Oct 31, 2016 at 3:25 AM, Sean Owen <so...@cloudera.com> wrote: > >> > >> ps -L [pid] is what shows threads. I am not sure this is counting what > you > >> think it does. My shell process has about a hundred threads, and I can't > >> imagine why one would have thousands unless your app spawned them. > >> > >> On Mon, Oct 31, 2016 at 10:20 AM kant kodali <kanth...@gmail.com> > wrote: > >>> > >>> when I do > >>> > >>> ps -elfT | grep "spark-driver-program.jar" | wc -l > >>> > >>> The result is around 32K. why does it create so many threads how can I > >>> limit this? > > > > > > > > > >