I think you did a good job of summarizing terminology and describing spark's operation. However #7 is inaccurate if I am interpreting correctly. The scheduler schedules X tasks from the current stage across all executors, where X is the the number of cores assigned to the application (assuming only this stage is running). `resourceOfferSingleTaskSet` in TaskSchedulerImpl gives an idea of how it's launching tasks from a stage's task set based on the current available cores across all executors: https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L220. This is what I have observed in all of our Spark Standalone clusters. In fact I just ran a job against my laptop "cluster" of 1 executor with 8 partitions in a stage. I have my `spark.cores.max` set to 4 and it ran 4 tasks concurrently, running new tasks after a previous one finished.
Also, #8 is only true if you setup fair scheduling at the sub-job level (there are two kinds of fair scheduling that I've seen, intra-job and sub-job. `spark.scheduler.mode` will set intra-job but you need to provide a configuration file to `spark.scheduler.pool` to get sub-job fair scheduling). On Mon, Aug 3, 2015 at 4:16 PM, Ajay Singal <asinga...@gmail.com> wrote: > Hi Sujit, > > > > From experimenting with Spark (and other documentation), my understanding > is as follows: > > 1. Each application consists of one or more Jobs > > 2. Each Job has one or more Stages > > 3. Each Stage creates one or more Tasks (normally, one Task per > Partition) > > 4. Master allocates one Executor per Worker (that contains > Partition) per Application > > 5. The Executor stays up for the lifetime of the Application (and > dies when the Application ends) > > 6. Each Executor can run multiple Tasks in parallel (normally, the > parallelism depends on the number of cores per Executor). > > 7. The Scheduler schedules only one Task from each Stage to one > Executor. > > 8. If there are multiple Stages (from a Job) and these Stages could > be run asynchronously (i.e., in parallel), one Task from each Stage could > be scheduled on the same Executor (thus this Executor runs multiple Tasks > in parallel: see #6 above). > > > > Of course, there could be many exception/exclusions to what I explained > above. I expect that Spark community will confirm or correct my > observations/understanding above. > > > > Now, let’s come back to your situation. You have a cluster of 4 Workers > with 10 Partitions. All of these 10 Partitions are distributed among these > 4 Workers. Also, from the information provided by you, your Application > has just one Job with a two Stages (repartition and mapPartition). The > mapPartition Stage will have 10 Tasks. Assuming my > observations/understanding is correct, by virtue of #7 above, only 4 Tasks > can be executed in parallel. The subsequent Jobs will have to wait. > > > > However, if you had 10 or more Workers, all Tasks would have been executed > in parallel. BTW, I believe, you can have multiple Workers on one Physical > Node. So, one of the solution to your problem would be to increase the > number of Workers. > > > > Having said so, I believe #7 above is the bottleneck. If there is no good > reason for keeping this bottleneck, this could be a good area of > improvement (and needs to be addressed by Spark community). I will wait > for the community response, and if needed, I will open a JIRA item. > > > > I hope it helps. > > > > Regards, > > Ajay > > On Mon, Aug 3, 2015 at 1:16 PM, Sujit Pal <sujitatgt...@gmail.com> wrote: > >> @Silvio: the mapPartitions instantiates a HttpSolrServer, then for each >> query string in the partition, sends the query to Solr using SolrJ, and >> gets back the top N results. It then reformats the result data into one >> long string and returns the key value pair as (query string, result string). >> >> @Igor: Thanks for the parameter suggestions. I will check the >> --num-executors and if there is a way to set the number of cores/executor >> with my Databricks admin and update here if I find it, but from the >> Databricks console, it appears that the number of executors per box is 1. >> This seems normal though, per the diagram on this page: >> >> http://spark.apache.org/docs/latest/cluster-overview.html >> >> where it seems that there is 1 executor per box, and each executor can >> spawn multiple threads to take care of multiple tasks (see bullet #1 copied >> below). >> >>> Each application gets its own executor processes, which stay up for the >>> duration of the whole application and run tasks in multiple threads. This >>> has the benefit of isolating applications from each other, on both the >>> scheduling side (each driver schedules its own tasks) and executor side >>> (tasks from different applications run in different JVMs). >> >> >> Regarding hitting the max number of requests, thanks for the link. I am >> using the default client. Just peeked at the Solr code, and the default >> settings (if no HttpClient instance is supplied in the ctor) is to use >> DefaultHttpClient (from HttpComponents) whose settings are as follows: >> >>> >>> - Version: HttpVersion.HTTP_1_1 >>> >>> >>> - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET >>> >>> >>> - NoTcpDelay: true >>> >>> >>> - SocketBufferSize: 8192 >>> >>> >>> - UserAgent: Apache-HttpClient/release (java 1.5) >>> >>> In addition, the Solr code sets the following additional config >> parameters on the DefaultHttpClient. >> >> params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); >>> params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); >>> params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); >> >> Since all my connections are coming out of 2 worker boxes, it looks like >> I could get 32x2 = 64 clients hitting Solr, right? >> >> @Steve: Thanks for the link to the HttpClient config. I was thinking >> about using a thread pool (or better using a PoolingHttpClientManager per >> the docs), but it probably won't help since its still being fed one request >> at a time. >> @Abhishek: my observations agree with what you said. In the past I have >> had success with repartition to reduce the partition size especially when >> groupBy operations were involved. But I believe an executor should be able >> to handle multiple tasks in parallel from what I understand about Akka on >> which Spark is built - the worker is essentially an ActorSystem which can >> contain multiple Actors, each actor works on a queue of tasks. Within an >> Actor everything is sequential, but the ActorSystem is responsible for >> farming out tasks it gets to each of its Actors. Although it is possible I >> could be generalizing incorrectly from my limited experience with Akka. >> >> Thanks again for all your help. Please let me know if something jumps out >> and/or if there is some configuration I should check. >> >> -sujit >> >> >> >> On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh < >> abhis...@tetrationanalytics.com> wrote: >> >>> I don't know if (your assertion/expectation that) workers will process >>> things (multiple partitions) in parallel is really valid. Or if having more >>> partitions than workers will necessarily help (unless you are memory bound >>> - so partitions is essentially helping your work size rather than execution >>> parallelism). >>> >>> [Disclaimer: I am no authority on Spark, but wanted to throw my spin >>> based my own understanding]. >>> >>> Nothing official about it :) >>> >>> -abhishek- >>> >>> On Jul 31, 2015, at 1:03 PM, Sujit Pal <sujitatgt...@gmail.com> wrote: >>> >>> Hello, >>> >>> I am trying to run a Spark job that hits an external webservice to get >>> back some information. The cluster is 1 master + 4 workers, each worker has >>> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, >>> and is accessed using code similar to that shown below. >>> >>> def getResults(keyValues: Iterator[(String, Array[String])]): >>>> Iterator[(String, String)] = { >>>> val solr = new HttpSolrClient() >>>> initializeSolrParameters(solr) >>>> keyValues.map(keyValue => (keyValue._1, process(solr, keyValue))) >>>> } >>>> myRDD.repartition(10) >>> >>> .mapPartitions(keyValues => getResults(keyValues)) >>>> >>> >>> The mapPartitions does some initialization to the SolrJ client per >>> partition and then hits it for each record in the partition via the >>> getResults() call. >>> >>> I repartitioned in the hope that this will result in 10 clients hitting >>> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous >>> clients if I can). However, I counted the number of open connections using >>> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and >>> observed that Solr has a constant 4 clients (ie, equal to the number of >>> workers) over the lifetime of the run. >>> >>> My observation leads me to believe that each worker processes a single >>> stream of work sequentially. However, from what I understand about how >>> Spark works, each worker should be able to process number of tasks >>> parallelly, and that repartition() is a hint for it to do so. >>> >>> Is there some SparkConf environment variable I should set to increase >>> parallelism in these workers, or should I just configure a cluster with >>> multiple workers per machine? Or is there something I am doing wrong? >>> >>> Thank you in advance for any pointers you can provide. >>> >>> -sujit >>> >>> >> > -- *Richard Marscher* Software Engineer Localytics Localytics.com <http://localytics.com/> | Our Blog <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> | Facebook <http://facebook.com/localytics> | LinkedIn <http://www.linkedin.com/company/1148792?trk=tyah>